Мне очень сложно понять, как работает процессор записи слияния.
У меня есть поток, который генерирует за 1 час около 70 000 файлов потоков. Мне нужно, чтобы все эти потоковые файлы были объединены в один уникальный потоковый файл. Flow-файлы похожи на CSV, поэтому это довольно просто. Что непросто сделать это за один раз.
На данный момент у меня есть 2 процессора MergeRecord один за другим.
Первый настроен следующим образом:
- Мин. / Макс. Запись: 1000/2000
- Макс. Возраст бункера: 60 с
- Номер бункера: 2
- График работы: 10 с
Последний настроен следующим образом:
- Мин. / Макс. Запись: 70/2000
- Максимальный возраст бункера: 6000 с
- Номер бункера: 2
- Расписание выполнения: 60 с
Каждая входящая очередь использует стратегию балансировки одного узла.
Что я ожидаю:
Первая запись MergeRecord будет объединена запись в соответствии с:
- Достигнута минимальная запись
- Если корзина заполнена
- Если достигнут возраст корзины
Таким образом, я получил около 70 объединенных файлов потоков.
Вторая MergeRecord объединит запись в соответствии с:
- Достигнута минимальная запись
- Если размер корзины полный
- Если достигнут возраст бункера
Таким образом, я попал в 1 файл потока.
Что у меня есть:
Я попытался поиграть с параметрами процессора , но похоже, что этот процессор «только» опережает параметры расписания запуска. Я должен установить для него высокое значение, если я только один файл потока в конце моего потока. Так что процессор будет срабатывать только при заполнении очереди. Проблема в том, что 70 000 потоковых файлов в очереди часто приводят меня к ошибке OOM или Java ошибке размера кучи.
Есть какие-нибудь советы по настройке процессора mergeRecord?
Спасибо,
PS: Я работаю в кластере из 3 узлов.