После долгих исследований, проб и ошибок и подсказок от NotaJD я нашел решение, которое может сработать (все еще тестирование).На самом деле сделайте это 2 решения, но они отличаются только по типу исполнения.
Информация:
Ради объяснения я буду использовать следующую информацию:
- Таблица содержит 7 миллионов записей (строк)
AggregationStrategyImpl
расширяется AggregationStrategy
со следующим: - Возвращает
List<Object>
в теле обмена - Агрегация
Predicate
завершается, когда List<Object> >= 50000
- Время ожидания агрегирования установлено на
30000
миллисекунды
CustomThreadPool
- это псевдо-реализация класса Camel ThreadPoolBuilder
: - PoolSize: 100
- MaxPoolSize: 50000
- MaxQueueSize: 500
- TimeUnit: МИЛЛИСЕКОНДЫ
- KeepAliveTime: 30000
- Обе реализации автоматически подключаются
Решение 1:
from("quartz2://myGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "createQuery")
Код будет по-прежнему выполняться на кварцевом таймере хронирования (00:01 каждый день)но на этот раз мой QueryTable.class
извлечет правильный запрос для выполнения (вместо SELECT *
, я теперь указал нужные мне столбцы) и задаю it к телу обмена.
.to("jdbc:dataSourceInformix?resetAutoCommit=false&outputType=StreamList").split(body()).streaming()
.bean(TransformRecord.class, "process")
Компонент Camel jdbc
примет запрос от тела обмена, установите для resetAutoCommit
значение false, чтобы не выдавать ошибку Cursor not open
, установитевывод для потоковой передачи и разделения потока выполнения, таким образом, я не буду запрашивать все записи сразу, а вместо этого одну за другой.Каждая извлеченная запись затем преобразуется в правильный POJO через TransformRecord.class
.
.aggregate(constant(true), aggregationStrategyImpl)
.completionPredicate(aggregationStrategyImpl.getCompletionPredicate())
.completionTimeout(aggregationStrategyImpl.getCompletionTimeout())
.to("direct:start-processing")
.end();
На этот раз я использую компонент aggregate
для создания списка записей.aggregationStrategyImpl
содержит логику агрегации, а также предикат завершения и время ожидания, поэтому, когда я достигну определенного количества записей (или произойдет тайм-аут), список будет отправлен на «direct: start-processing».
Подробнее о реализации агрегации в этом источнике Союзники блог и в Apache Camel Aggregate EIP документы.
from("direct:start-processing")
.split(body()).executorService(customThreadPool.build(getContext()))
.bean(AnalyzeData.class, "analyze")
.bean(PersistData.class, "persist")
.end();
Здесь я разделю полученный список ииспользуя пользовательский ThreadPool, я создаю N потоков для анализа и обработки каждой записи в отдельности.Таким образом, я могу обрабатывать мой список параллельно, а не по одному.Я мог бы использовать .split(body()).parallelProcessing()
, но настройки ThreadPool по умолчанию в дальнейшем могут быть неоптимальными.
Подробнее о реализации ThreadPool в Apache Camel Модель потоков docs, Конфигурация ThreadPool примечания и модель потоков Red Hat документы.
Решение 2:
Для этого решения это, в основном, то же самое выполнение, но со следующими изменениями:
// .to("direct:start-processing")
.to("seda:start-processing?size=1&blockWhenFull=true")
.end();
// from("direct:start-processing")
from("seda:start-processing?size=1&blockWhenFull=true")
// continues normally
Что бы это сделать, это отправить список процессам асинхронно , позволяя помещать в память до 1 другого списка в очередь и приостанавливать родительский поток, если очередь заполнена.Таким образом, вместо ожидания обработки списка записей родительский поток вернется и соберет другой пакет записей.Это также означает, что в случае, если маршрут обработки не завершен, новые записи не будут выброшены, и родительский поток будет ожидать, пока он не сможет отправить пакет в очередь SEDA в памяти.
Дополнительноо компоненте SEDA в документах Apache Camel SEDA Component в GitHub и на их сайте
Выводы:
С решением 1 должно потребоваться многодольше завершить, поскольку сначала обрабатываются все данные, а затем собирается больше записей из запроса, но потребление памяти должно быть на FAR меньше, поскольку оно контролируется в предикате агрегации.
В решении 2 это должно быть намного быстреепоскольку он собирает следующий пакет записей из запроса при обработке предыдущего пакета, но потребление памяти будет на порядок выше, поскольку он будет содержать не более 3 списков: один обрабатываемый, один в очереди SEDA и последний пакетсобирается родительским потоком (приостанавливается, когда очередь заполнена).
Я заявил, что все еще тестирую эти решения, потому что с 500k-записями это работает, но я все еще работаю над оптимальными настройками ThreadPool для сервера, на котором это будет реализовано. Я исследовал потоки в Java, но этокажется, что на самом деле не так уж много, кроме архитектуры системы, оперативной памяти и проб и ошибок.