Как взять весь контент потока файла в процессоре nifi - PullRequest
0 голосов
/ 26 августа 2018

Я использую nifi для разработки дрейфа данных.В моем потоке используется процессор SelectHiveQL.Вывод (flowFile) из selectHiveQL необходимо перенести в следующий процессор.Какой процессор подходит для загрузки содержимого flowFile и его сохранения в определяемой пользователем переменной, должен использовать ту же переменную в Executescript для манипулирования данными.

Ответы [ 2 ]

0 голосов
/ 29 августа 2018

Вы можете использовать ExtractText для извлечения содержимого вашего потокового файла в атрибут.

В процессоре ExtractText вы должны создать свойство (имя, которое вы дадите этому свойству, будет новым атрибутом вашего потокового файла), а значением свойства будет регулярное выражение (\A.+\Z). По моему опыту, этого регулярного выражения достаточно для захвата всего содержимого потокового файла, хотя я полагаю, что пробег может варьироваться в зависимости от типа содержимого вашего потокового файла.

0 голосов
/ 27 августа 2018

Процессор ExecuteScript имеет прямой доступ к содержимому входящего потокового файла через стандартный API. Вот пример:

def flowFile = session.get();
if (flowFile == null) {
    return;
}

// This uses a closure acting as a StreamCallback to do the writing of the new content to the flowfile
flowFile = session.write(flowFile,
        { inputStream, outputStream ->
            String line

            // This code creates a buffered reader over the existing flowfile input
            final BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))

            // For each line, write the reversed line to the output
            while (line = inReader.readLine()) {
                outputStream.write("${line.reverse()}\n".getBytes('UTF-8'))
            }
        } as StreamCallback)

flowFile = session?.putAttribute(flowFile, "reversed_lines", "true")
session.transfer(flowFile, /*ExecuteScript.*/ REL_SUCCESS)

Перемещать содержимое потокового файла в атрибут опасно, поскольку в NiFi управление атрибутами и памятью содержимого осуществляется по-разному. Более подробное объяснение различий содержится в руководстве Apache NiFi In Depth .

...