Процессор ExecuteScript
имеет прямой доступ к содержимому входящего потокового файла через стандартный API. Вот пример:
def flowFile = session.get();
if (flowFile == null) {
return;
}
// This uses a closure acting as a StreamCallback to do the writing of the new content to the flowfile
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
String line
// This code creates a buffered reader over the existing flowfile input
final BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))
// For each line, write the reversed line to the output
while (line = inReader.readLine()) {
outputStream.write("${line.reverse()}\n".getBytes('UTF-8'))
}
} as StreamCallback)
flowFile = session?.putAttribute(flowFile, "reversed_lines", "true")
session.transfer(flowFile, /*ExecuteScript.*/ REL_SUCCESS)
Перемещать содержимое потокового файла в атрибут опасно, поскольку в NiFi управление атрибутами и памятью содержимого осуществляется по-разному. Более подробное объяснение различий содержится в руководстве Apache NiFi In Depth .