Нифи - Обработка файлов на основе количества или истекшего времени? - PullRequest
0 голосов
/ 14 декабря 2018

У меня есть следующий поток, ListFile ---> FetchFile --->?ExecuteScript (возможно) ---> Notify

По сути, я хочу перейти к Notify, если

  • Всего потоковых файлов (из файлов выборки), скажем, 200; ИЛИ
  • Прошедшее время (от последнего сигнала), скажем, 3 часа.

Я думаю, что первое условие легко достичь.У меня может быть Groovy скрипт, который может читать число потоковых файлов, если 200 перейти к SUCCESS или ROLLBACK сеанса.

Но я хочу знать, как также проверить, сколько времени прошло для n (число может быть меньше, чем200) потоковых файлов в очереди более 3 часов или около того?

Обновление Вот проблема: у нас есть пакетная обработка (~ 200 файлов и может увеличиться в зависимости от бизнеса в будущем) в настоящее время,У нас есть конвейер NiFi, то есть проверка списка, выборки, базовой проверки контрольной суммы и т. Д., И процесс (вызов SQL), который работает нормально.Согласно бизнесу, в течение дня мы можем вносить исправления в данные, чтобы мы могли получить все или некоторые файлы для «повторной обработки».Это тоже нормально и работает.

Теперь, в соответствии с новыми требованиями, нам нужно построить процесс после завершения этого «пакета».Таким образом, в лучшем случае я могу использовать процессор MergeContent с максимальным значением n для бина и подавать сигнал или уведомлять мой новый процессор.Однако, как объяснено выше, в течение этого дня мы можем обработать несколько или все файлы заново.Так что теперь мой "n" может не совпадать с новым "числом" файлов, которые были обработаны повторно.Следовательно, даже в этом случае, если мы истекли, скажем, 3 часа, то независимо от того, что «n» не равно новому количеству повторно обработанных файлов, я должен уведомить новый процесс о повторном запуске.Следовательно, я ищу n файлов ИЛИ m прошедших часов проверки.

1 Ответ

0 голосов
/ 14 декабря 2018

Я думаю, что это может быть примером проблемы XY - вы пытаетесь решить проблему и считаете, что подсчет числа выбранных файлов или истекшего времени поможет, но этот шаблон обычно не рекомендуется в Apache NiFi, и есть другие решения исходной проблемы.Я бы посоветовал вам более полно описать проблему более высокого уровня, которую вы пытаетесь решить, чтобы увидеть, есть ли лучшее решение.

Я отвечу на вопрос (ни одно из них не является идеальным решением).

  • Вы можете использовать MergeContent процессор с минимальным количеством бинов 200
  • Вы можете использовать ExecuteScript процессор, как вы отметили
  • Вы можете написатьзначение (текущая временная метка), равное DistributedCacheMapServer при выполнении процессора Notify, и проверьте это значение с помощью процессора FetchDistributedCacheMap относительно текущей временной метки и используйте простую инструкцию языка выражений для сравнения значений временной метки

Я думаю, вы также можете прочитать некоторые примеры логики Wait / Notify, потому что создание порогов, таких как "200 входящих потоковых файлов || 3 часа истекшего времени", это то, что делает процессор Wait.

...