Возврат нескольких файлов из Python ExecuteScript в NiFi - PullRequest
0 голосов
/ 25 февраля 2020

Я написал скрипт Python / Jython для запуска в процессоре NiFi ExecuteScript для анализа моего недействительного документа JSON. Я написал сценарий ниже на основе сценария в этом вопросе и фантастической книге Мэтта Берджесса c cookbook , но он не возвращает несколько потоковых файлов. Вместо этого он возвращает файл входного потока с примененными исправлениями регулярных выражений, но только как один файл. Что мне нужно изменить, чтобы вернуть 1 файл потока для каждой строки в l oop?

script

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import re

# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    # regex out invalid escapes -- WORKS
    text = re.sub(r"[(\\)]", "", text)
    # split out each line into a separate file -- DOES NOT WORK
    for t in text.splitlines():
        outputStream.write(t)
# end class
flowFile = session.get()
if(flowFile != None):
    try:
        flowFile = session.write(flowFile, PyStreamCallback())
        session.transfer(flowFile, REL_SUCCESS)
    except Exception as e:
        log.error('Something went wrong', e)
        session.transfer(flowFile, REL_FAILURE)
# implicit return at the end

json - целью является каждая строка == один файл потока

{"fruit":"apple", "vegetable":"celery", "location":{"country":"nor\\way", "city":"oslo"}, "color":"blue"}
{"fruit":"cherry", "vegetable":"kale", "location":{"country":"france", "city":"calais"}, "color":"green"}
{"fruit":"peach", "vegetable":"peas", "location":{"country":"united\\kingdom", "city":"london"}, "color":"yellow"}

ETA Добавлен session.create() и удален исходный файл потока с session.remove(flowFile) на это , но NiFi говорит, что flowFile не определен?

# imports not changed 
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    text = re.sub(r"[(\\)]", "", text)
    flowfiles_list = []
    for t in text.splitlines():
        flowFile = session.create()
        if (flowFile != None):
            flowFile = session.write(flowfile, t)
            flowfiles_list.append(flowFile)
    for flow in flowfiles_list:
        session.transfer(flow, REL_SUCCESS)

originalFlowFile = session.get()
if(originalFlowFile != None):
    # NiFi says flowFile not defined
    originalFlowFile = session.write(flowFile, PyStreamCallback()) 
    session.remove(originalFlowFile)

1 Ответ

0 голосов
/ 25 февраля 2020
...