Apache Нифи: Как я могу узнать или проверить, что все файлы потоков были обработаны - PullRequest
2 голосов
/ 20 июня 2020

У меня есть поток в NiFi:

Download file -> UnpackContent -> PutHDFS

После того, как все файлы потоков помещены в HDFS, мне нужно запустить сценарий оболочки.

Как я могу узнать или проверить, что все файлы потоков были обработаны ?

Ответы [ 4 ]

2 голосов
/ 21 июня 2020

@ Антон Букреев

У вас должна быть возможность проверить атрибуты потокового файла для fragment.index и fragment.count.

enter image description here

This values are used to indicate which part in the unpacked series each unpacked flowfile is. This is how you know they are done processing in HDFS. You would need to execute a MergeContent Or Wait/Notify process based on the count before using ExecuteScript. You can also access these in ExecuteScript if you need to return metadata about the part(s) locations in HDFS. I recommend the later as you likely needed to unpack the results for good reason in the final step of the flow.

I have created an UnpackContent Demo template for you which you can find on my GitHub:

https://github.com/steven-dfheinz/NiFi-Templates/blob/master/UnpackContent_Demo.xml

1 голос
/ 21 июня 2020

NiFi предоставляет репозиторий журналов событий происхождения, который включает информацию обо всех действиях, которые произошли с каждым потоковым файлом в вашем кластере.

Подробнее о журналах происхождения и о том, как их использовать, можно узнать здесь: https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#provenance -репозиторий

1 голос
/ 20 июня 2020

Нет хорошего способа сделать это, потому что NiFi - это продукт потоковой обработки - он постоянно читает файлы из одной или нескольких точек, обрабатывает их и отправляет в другую точку.

Так что не существует такой вещи, как завершение обработки всех файлов в NiFi, потому что поток продолжает работать и ждать новых файлов.

Что вы можете сделать, так это запросить репозиторий происхождения и просмотреть указанный c файл потока и проверить, завершил ли он поток .

Итак, я предлагаю сделать следующее:

Если вы знаете, сколько файлов вы ожидали обработать : запросите репозиторий происхождения, чтобы узнать, сколько файлов завершили поток.

Если вы этого не сделаете : запросите последний раз, когда новый файл был записан в HDFS, и если прошло больше X секунд, запустите сценарий.

0 голосов
/ 22 июня 2020

Nifi UnpackContent процессор записывает атрибуты fragment.identifier и fragment.count. Эти атрибуты могут автоматически обрабатываться процессором Nifi MergeContent с помощью Merge Strategy = Defragment. Таким образом, у вас может быть следующий поток:

UnpackContent -> PutHDFS -> AttributesToJSON -> MergeContent -> ... 

AtributesToJSON необходимо для удаления содержимого Flow File и без потери производительности на MergeContent. enter image description here

MergeContent will automatically merge all Flow Files related to single archive. введите описание изображения здесь

В качестве альтернативы вы можете реализовать свой собственный logi c на основе атрибутов fragment.identifier и fragment.count с процессорами Wait / Notify. Но я полагаю, что в вашем случае будет проще вариант MergeContent.

...