Синхронизировать группы процессов NiFi или потоки, которые не / не могут соединиться? - PullRequest
0 голосов
/ 04 мая 2018

Как и в случае с вопросом, есть ли способ синхронизации групп процессов или конвейеров NiFi, которые не / не могут подключаться в пользовательском интерфейсе?

Например. У меня есть процесс, в котором я хочу getFTP->putHDFS->moveHDFS (который в итоге становится getFTP->putHDFS->listHDFS->moveHDFS, см. https://stackoverflow.com/a/50166151/8236733). Однако listHDFS, похоже, не принимает никаких входящих соединений. Попытка сделать что-то с группами процессов, такими как P1{getFTP->putHDFS->outport}->P2{inport->listHDFS->moveHDFS} также сталкивается с той же проблемой (listHDFS не может принимать какие-либо входящие соединения). Мы не хотим moveHDFS, прежде чем мы вообще ничего не получим от getFTP, но, учитывая вышеизложенное, не вижу, как эти действия могут быть синхронизированы, чтобы происходить в правильном порядке.

Новичок в NiFi, но я думаю, что это обычный случай использования, и должен быть какой-то способ сделать это, которого мне не хватает. Совет в этом был бы оценен. Спасибо.

1 Ответ

0 голосов
/ 05 мая 2018

Я не уверен, какое требование мешает вам записать файл, полученный с FTP, прямо в нужное место HDFS, или это «запись n файлов в HDFS с началом .» имя файла, а затем переименовать все, когда достигнут некоторый определенный порог "сценарий.

ListHDFS не принимает никаких входящих отношений, потому что они не должны запускаться входящим событием, а скорее по расписанию таймера / CRON. Каждый раз, когда он запускается, он создает n потоковых файлов, где каждый ссылается на файл HDFS, который был обнаружен для записи в файловую систему с момента последнего выполнения. Для этого процессор хранит локальное состояние.

В этом случае ваши сегменты потока не нужно подключать. У вас будет «потоковый сегмент A», который выполняет запись FTP -> HDFS (GetFTP -> PutHDFS), и у вас будет независимый «потоковый сегмент B», который перечисляет каталог HDFS, читает файловые дескрипторы (но не содержимое файл, если только вы не используете FetchHDFS) и перемещает их (ListHDFS -> MoveHDFS). Процессор ListHDFS будет работать постоянно, но если он не обнаружит новые файлы во время выполнения, он просто выдаст и выполнит no-op. Как только процессор PutHDFS завершит задачу записи файла в файловую систему HDFS, при следующем выполнении ListHDFS он обнаружит этот файл и сгенерирует потоковый файл, описывающий его.

Вы можете настроить расписание по своему вкусу, но в целом это очень распространенная схема в потоках NiFi.

...