Я не уверен, какое требование мешает вам записать файл, полученный с FTP, прямо в нужное место HDFS, или это «запись n файлов в HDFS с началом .
» имя файла, а затем переименовать все, когда достигнут некоторый определенный порог "сценарий.
ListHDFS
не принимает никаких входящих отношений, потому что они не должны запускаться входящим событием, а скорее по расписанию таймера / CRON. Каждый раз, когда он запускается, он создает n потоковых файлов, где каждый ссылается на файл HDFS, который был обнаружен для записи в файловую систему с момента последнего выполнения. Для этого процессор хранит локальное состояние.
В этом случае ваши сегменты потока не нужно подключать. У вас будет «потоковый сегмент A», который выполняет запись FTP -> HDFS (GetFTP -> PutHDFS
), и у вас будет независимый «потоковый сегмент B», который перечисляет каталог HDFS, читает файловые дескрипторы (но не содержимое файл, если только вы не используете FetchHDFS
) и перемещает их (ListHDFS -> MoveHDFS
). Процессор ListHDFS
будет работать постоянно, но если он не обнаружит новые файлы во время выполнения, он просто выдаст и выполнит no-op. Как только процессор PutHDFS
завершит задачу записи файла в файловую систему HDFS, при следующем выполнении ListHDFS
он обнаружит этот файл и сгенерирует потоковый файл, описывающий его.
Вы можете настроить расписание по своему вкусу, но в целом это очень распространенная схема в потоках NiFi.