Apache Nifi для перемещения файлов в новую папку hdfs для файлов меньше текущей даты - PullRequest
0 голосов
/ 12 октября 2019

Я создаю сквозной поток для использования данных в HDFS, используя Consume Kafka для файлов Json, полученных через поток событий tealium.

В настоящее время я использовал

Consume Kafka -> Evaluate Json Path -> Jolttransform Json -> Merge Content -> Evaluate Json Path -> Update attribute -> PutHDFS ->MoveHDFS

Требуется прочитать данные JSON за весь день спулинга в один файл, ссылаясь на атрибут postdate (скрытая эпоха до отметки времени YYYYMMDDSS) и ежедневно читать данные, чтобы объединиться в один выходной файл и, наконец, переименовать файл в соответствии с отметкой времени, связанной сПоле POST_DATE для дифференциации ежедневных файлов.

В папке вывода текущей даты должны быть только файлы обработки текущей даты, а все завершенные файлы вывода за более ранние даты должны быть перемещены в другую папку.

Не могли бы вы помочь мне, как работать с MoveHDFSдля рекурсивного поиска в папке hdfs и перемещения готовых выходных файлов, не совпадающих с текущей датой, для перемещения в другую папку.

1 Ответ

0 голосов
/ 24 октября 2019

Поток тока работал нормально. Потребить Kafka -> Оценить Json Path -> Jolttransform Json -> Объединить содержимое -> Оценить Json Path -> Обновить атрибут -> PutHDFS ---> Создать файл слияния.

Создать отдельный поток после вышеупомянутого потокасделано, чтобы получить обработанный файл слияния и повторно обработать его с помощью listhdfs-> fethchdfs-> updateattribute-> puthdfs

. В списке listhdfs задайте минимальное время ожидания файла перед его использованием. Это позволит процессу рекурсивно искать файлы, а также использовать атрибут updateat для повторного создания папки в соответствии с родительской папкой для повторного использования файла процесса.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...