Вот несколько примечаний к вашему сценарию:
1) Вы создаете подкласс StreamCallback и записываете в исходный файл потока, но затем удаляете его позже. StreamCallback предназначен для того, чтобы перезаписать содержимое существующего файла потока. Если вам не нужно этого делать, вы можете использовать InputStreamCallback в качестве базового класса, он не будет принимать outputStream
arg, но в этом случае он вам не понадобится. Вы также используете session.read
в исходном файле потока, а не session.write
.
2) Строка flowFile = session.write(flowFile, "Nothing")
недопустима, потому что session.write
нуждается в OutputStreamCallback или StreamCallback в качестве аргумента (так же, как там, где вы вызываете его с помощью PyStreamCallback ниже). Когда это выдает ошибку, оно поднимается до самого верхнего уровня скрипта, но к тому времени вы создали файл потока и не достигли оператора, который передает flowfiles_list в REL_SUCCESS. Попробуйте добавить try/except
вокруг session.write
, тогда вы можете удалить только что созданный файл потока и вызвать исключение.
3) Если вы хотите прочитать все содержимое файла входящего потока в память (что вы сейчас делаете), затем удалите исходный файл потока и вместо этого создайте из него новые файлы потока, рассмотрите вместо этого использование версии session.read()
, который возвращает InputStream (т.е. не требует InputStreamCallback
). Затем вы можете сохранить содержимое в глобальной переменной и / или передать его в OutputStreamCallback, если хотите что-то записать в созданные файлы потока. Что-то вроде:
inputStream = session.read(originalFlowFile)
text_content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
inputStream.close()
flowfiles_list = []
for n in range(0,5):
flowFile = session.create(originalFlowFile)
if (flowFile != None):
try:
flowFile = session.write(flowFile, PyStreamCallback(text_content))
flowfiles_list.append(flowFile)
except Exception as e:
session.remove(flowFile)
raise
for flow in flowfiles_list:
session.transfer(flow, REL_SUCCESS)
session.remove(originalFlowFile)
Это не включает рефакторинг PyStreamCallback в качестве OutputStreamCallback, который принимает строковый аргумент вместо FlowFile в конструкторе.