Допустим, у меня есть 100 потоковых файлов, созданных одним процессором, каждый из которых содержит свою строку. Я хочу получить новый файл потока, который содержит 100 строк. Как я могу это сделать?
Я пробовал процессор MergeContent, но он возвращает мне исходные 100 потоковых файлов.
Текущая конфигурация:
![enter image description here](https://i.stack.imgur.com/do1od.png)
Обновление:
Я отладил вывод MergeContent
, на первом шаге JOIN
все выглядит нормально, поскольку данные 576.34 KB
, которые содержат 100 строк. Но на втором шаге ATTRIBUTES_MODIFIED
кажется, выводится только 1 строка для окончательного результата.
![enter image description here](https://i.stack.imgur.com/QvTHB.png)
Обновление:
Это вся моя процедура.
- Получить от кафки по одному.
- Преобразование сообщения kafka в одну строку строки в одном файле потока.
- Объединение нескольких потоковых файлов в один.
- PutHDFS.
Теперь я застрял на шаге 3, я не могу объединить их один за другим. Меня не волнует порядок или атрибут, мне просто нужно ограничить число.
Обновление:
Я попытался установить correlation attribute
на ${kafka.topic}
, поскольку все файлы потока из одной и той же темы kafka, но они все еще не могут объединиться:
![enter image description here](https://i.stack.imgur.com/h6sKO.png)