Я написал скрипт Python / Jython для запуска в процессоре NiFi ExecuteScript
для анализа моего недействительного документа JSON. Я написал сценарий ниже на основе сценария в этом вопросе и фантастической книге Мэтта Берджесса c cookbook , но он не возвращает несколько потоковых файлов. Вместо этого он возвращает файл входного потока с примененными исправлениями регулярных выражений, но только как один файл. Что мне нужно изменить, чтобы вернуть 1 файл потока для каждой строки в l oop?
script
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import re
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# regex out invalid escapes -- WORKS
text = re.sub(r"[(\\)]", "", text)
# split out each line into a separate file -- DOES NOT WORK
for t in text.splitlines():
outputStream.write(t)
# end class
flowFile = session.get()
if(flowFile != None):
try:
flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
except Exception as e:
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
# implicit return at the end
json - целью является каждая строка == один файл потока
{"fruit":"apple", "vegetable":"celery", "location":{"country":"nor\\way", "city":"oslo"}, "color":"blue"}
{"fruit":"cherry", "vegetable":"kale", "location":{"country":"france", "city":"calais"}, "color":"green"}
{"fruit":"peach", "vegetable":"peas", "location":{"country":"united\\kingdom", "city":"london"}, "color":"yellow"}
ETA Добавлен session.create()
и удален исходный файл потока с session.remove(flowFile)
на это , но NiFi говорит, что flowFile не определен?
# imports not changed
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
text = re.sub(r"[(\\)]", "", text)
flowfiles_list = []
for t in text.splitlines():
flowFile = session.create()
if (flowFile != None):
flowFile = session.write(flowfile, t)
flowfiles_list.append(flowFile)
for flow in flowfiles_list:
session.transfer(flow, REL_SUCCESS)
originalFlowFile = session.get()
if(originalFlowFile != None):
# NiFi says flowFile not defined
originalFlowFile = session.write(flowFile, PyStreamCallback())
session.remove(originalFlowFile)