объединение потоковых файлов в указанном порядке - PullRequest
0 голосов
/ 11 января 2019

Я новичок в nifi (используется версия 1.8.0). У меня есть требование потреблять сообщения кафки, которые содержат позицию транспортного средства в форме широты, долготы за сообщение. Поскольку каждое сообщение будет поступать в виде файла потока, мне нужно объединить все эти файлы потока и создать файл json, содержащий полный путь, по которому следует транспортное средство. Я использую процессор потребителя kafka для подписки на сообщения, обновляю процессор атрибутов (добавлены следующие свойства: filename: $ {getStateValue ("seq")}, seq: $ {getStateValue ("seq"): plus (1)}) для добавления порядковый номер в качестве имени файла (например, имя файла равно 1,2, 3 и т. д.) и поместите файловый процессор для записи этих файлов в указанный каталог. Я настроил очередь приоритетов FIFO для всех успешных отношений между вышеупомянутыми процессорами. Однажды я получил все сообщения, которые хочу объединить со всеми файлами потоков. Для этого я знаю, что мне нужно использовать get file, установить порядок, объединить содержимое (стратегия объединения: алгоритм упаковки бинов, формат объединения: двоичная конкатенация) и обработчик файлов put соответственно. Мой подход правильный? Как мне установить, что объединение файлов происходит в последовательности их имен, так как имя файла - это следующий номер. Что я должен поставить в атрибуте порядка в принудительном порядке обработчик? Что следует добавить в идентификатор группы? Можно ли добавить дополнительные настраиваемые поля в обработчик принудительного порядка?

1 Ответ

0 голосов
/ 11 января 2019

EnforceOrder процессор документация

1. Идентификатор группы

  • Это свойство оценивается для каждого потокового файла для вашего случая. Используйте UpdateAttribute Процессор, добавьте атрибут group_name и используйте тот же атрибут ${group_name} в идентификаторе группы Значение свойства.

2. Атрибут заказа

  • Язык выражений не поддерживается.

  • Вы можете использовать filename (или) создать new attribute в UpdateAttribute процессора и используйте то же имя атрибута в вашем Order Attribute значение свойства.

Для справки / использования Принудительный заказ Использование процессора Этот шаблон и загрузка в ваш экземпляр NiFi.

...