Передача нескольких потоковых файлов из ExecuteScript в Nifi - PullRequest
0 голосов
/ 21 марта 2019

Я пытаюсь сгенерировать несколько потоковых файлов из одного потокового файла, используя процессор ExecuteScript в python.

Выходные потоковые файлы зависят от одного атрибута для конфигурации и входного потокового файла (содержимое xml).

Я перепробовал много вещей, но всегда заканчиваю ошибкой, такой как:

  • этот файл потока уже помечен для передачи
  • отношение передачи не указано

Ниже последнеговерсия:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import java.io
from org.python.core.util import StringUtil

class PyStreamCallback(StreamCallback):
    def __init__(self, flowFile):
        global matched
        self.parentFlowFile = flowFile
        pass

    def process(self, inputStream, outputStream):
        try:
            text_content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
            flowfiles_list = []

            new_xml = "blabla"
            outputStream.write(bytearray(new_xml.encode('utf-8')))

            for n in range(0,5):
                flowFile = session.create(self.parentFlowFile)
                if (flowFile != None):
                    flowFile = session.write(flowFile, "Nothing")
                    flowfiles_list.append(flowFile)

            for flow in flowfiles_list:
                session.transfer(flow, REL_SUCCESS)
        except:
            print('Error inside process')
            raise

originalFlowFile = session.get()
if(originalFlowFile != None):
    try :
        originalFlowFile = session.write(originalFlowFile, PyStreamCallback(originalFlowFile))
        session.remove(originalFlowFile)

    except Exception as e:
        originalFlowFile = session.putAttribute(originalFlowFile,'python_error', str(e))
        session.transfer(originalFlowFile, REL_FAILURE)

Может кто-нибудь сказать мне, что я делаю неправильно и как добиться того, что я хочу сделать?

1 Ответ

0 голосов
/ 21 марта 2019

Вот несколько примечаний к вашему сценарию:

1) Вы создаете подкласс StreamCallback и записываете в исходный файл потока, но затем удаляете его позже. StreamCallback предназначен для того, чтобы перезаписать содержимое существующего файла потока. Если вам не нужно этого делать, вы можете использовать InputStreamCallback в качестве базового класса, он не будет принимать outputStream arg, но в этом случае он вам не понадобится. Вы также используете session.read в исходном файле потока, а не session.write.

2) Строка flowFile = session.write(flowFile, "Nothing") недопустима, потому что session.write нуждается в OutputStreamCallback или StreamCallback в качестве аргумента (так же, как там, где вы вызываете его с помощью PyStreamCallback ниже). Когда это выдает ошибку, оно поднимается до самого верхнего уровня скрипта, но к тому времени вы создали файл потока и не достигли оператора, который передает flowfiles_list в REL_SUCCESS. Попробуйте добавить try/except вокруг session.write, тогда вы можете удалить только что созданный файл потока и вызвать исключение.

3) Если вы хотите прочитать все содержимое файла входящего потока в память (что вы сейчас делаете), затем удалите исходный файл потока и вместо этого создайте из него новые файлы потока, рассмотрите вместо этого использование версии session.read(), который возвращает InputStream (т.е. не требует InputStreamCallback). Затем вы можете сохранить содержимое в глобальной переменной и / или передать его в OutputStreamCallback, если хотите что-то записать в созданные файлы потока. Что-то вроде:

inputStream = session.read(originalFlowFile)
text_content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
inputStream.close()
flowfiles_list = []

for n in range(0,5):
    flowFile = session.create(originalFlowFile)
    if (flowFile != None):
        try:
            flowFile = session.write(flowFile, PyStreamCallback(text_content))
            flowfiles_list.append(flowFile)
        except Exception as e:
            session.remove(flowFile)
            raise

for flow in flowfiles_list:
    session.transfer(flow, REL_SUCCESS)

session.remove(originalFlowFile)

Это не включает рефакторинг PyStreamCallback в качестве OutputStreamCallback, который принимает строковый аргумент вместо FlowFile в конструкторе.

...