Я создаю сквозной поток для использования данных в HDFS, используя Consume Kafka для файлов Json, полученных через поток событий tealium.
В настоящее время я использовал
Consume Kafka -> Evaluate Json Path -> Jolttransform Json -> Merge Content -> Evaluate Json Path -> Update attribute -> PutHDFS ->MoveHDFS
Требуется прочитать данные JSON за весь день спулинга в один файл, ссылаясь на атрибут postdate (скрытая эпоха до отметки времени YYYYMMDDSS
) и ежедневно читать данные, чтобы объединиться в один выходной файл и, наконец, переименовать файл в соответствии с отметкой времени, связанной сПоле POST_DATE для дифференциации ежедневных файлов.
В папке вывода текущей даты должны быть только файлы обработки текущей даты, а все завершенные файлы вывода за более ранние даты должны быть перемещены в другую папку.
Не могли бы вы помочь мне, как работать с MoveHDFSдля рекурсивного поиска в папке hdfs и перемещения готовых выходных файлов, не совпадающих с текущей датой, для перемещения в другую папку.