Параллельная обработка большой таблицы SQL с помощью Camel - PullRequest
1 голос
/ 29 марта 2019

Я пытаюсь ежедневно обрабатывать около 7 миллионов строк из таблицы Informix с помощью Apache Camel, но не могу понять, как этого добиться.

Моя первая попытка, которая работала с очень низким набором данных (около 50 тыс. Строк), использовала .split(body()).parallelProcessing() примерно так:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryData").split(body()).parallelProcessing() // Essentially executes a query on my table and returns a list of MyTable.class
.bean(ProcessTable.class, "processData") // Converts each MyTable object into another type of object (NewData.class) for later processing, storing in them in a synchronized list
.end().to("direct:transform-data");

from("direct:transform-data")
.bean(ProcessNewData.class, "processNewData").split(body()).parallelProcessing() // Obtains list
.bean(AnalyzeData.class, "analyze") // Analyzes the data
.bean(PersistData.class, "persist") // Persists the new data on other tables
.end();

Это, конечно, привело к ошибке «OutOfMemory», когда я попробовал ее с 500k строками на .bean(QueryTable.class, "queryData").split(body()).parallelProcessing(), потому что он сначала попытался кэшировать все данные из запроса перед его анализом. Я попытался установить fetchSize на что-то вроде 100, но я получил ту же ошибку, и использование maxRows только дало бы мне количество строк, которое я указал, и проигнорировало остальные.

Моя следующая попытка состояла в использовании одного из компонентов Camel, таких как sql-component и jdbc , и попытке использовать Splitter для обработки каждой строки в отдельных потоках, но у меня возникла та же самая проблема .

SQL:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryDataParams") // Gets the params for my query
.to("sql:SELECT * FROM my_table WHERE date_received BETWEEN :#startDate AND :#endDate?dataSource=dataSourceInformix").split(body()).parallelProcessing()
// The rest would be essentially the same

JDBC:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryString") // Gets the query to execute
.to("jdbc:dataSourceInformix").split(body()).parallelProcessing()

Моя последняя попытка состояла в том, чтобы использовать maxMessagesPerPoll для sql и outputType=StreamList для компонентов jdbc, но, к сожалению, первый обрабатывает только одну строку за раз (а также он должен быть потребителем, чтобы использоваться как таковой) и последний дает мне java.sql.SQLException: Cursor not open исключение.

SQL:

from("sql:" + query +"?dataSource=dataSourceInformix&maxMessagesPerPoll=100") // I need to be able to use the quartz2 component

JDBC:

.to("jdbc:dataSourceInformix?outputType=StreamList").split(body()).streaming() // Throws exception

Конечная цель состоит в том, чтобы иметь возможность обрабатывать миллионы строк, не занимая столько памяти, чтобы предотвратить ошибку «OutOfMemory». Моя идея, если возможно, заключается в следующем:

  1. Создать мой запрос на кварцевом хрон-триггере
  2. Получение и группа N количества результатов
  3. Отправить группу результатов для обработки (в другой ветке), пока получена другая группа
  4. Повторите, пока все данные не были обработаны

Я знаю, что этот вопрос похож на , этот , но ответ не очень помогает моей ситуации. Я также заметил, что в документации по компоненту sql для производителя есть опция outputType=StreamList, но она реализована в версии 2.18 и выше, а у меня версия 2.14.1.

Любая помощь и советы будут чрезвычайно полезны!

Спасибо.

Другая информация: Apache Camel Версия: 2.14.1 База данных: Informix

1 Ответ

1 голос
/ 23 апреля 2019

После долгих исследований, проб и ошибок и подсказок от NotaJD я нашел решение, которое может сработать (все еще тестирование).На самом деле сделайте это 2 решения, но они отличаются только по типу исполнения.

Информация:

Ради объяснения я буду использовать следующую информацию:

  • Таблица содержит 7 миллионов записей (строк)
  • AggregationStrategyImpl расширяется AggregationStrategy со следующим:
    • Возвращает List<Object> в теле обмена
    • Агрегация Predicate завершается, когда List<Object> >= 50000
    • Время ожидания агрегирования установлено на 30000 миллисекунды
  • CustomThreadPool - это псевдо-реализация класса Camel ThreadPoolBuilder:
    • PoolSize: 100
    • MaxPoolSize: 50000
    • MaxQueueSize: 500
    • TimeUnit: МИЛЛИСЕКОНДЫ
    • KeepAliveTime: 30000
  • Обе реализации автоматически подключаются

Решение 1:

from("quartz2://myGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "createQuery")

Код будет по-прежнему выполняться на кварцевом таймере хронирования (00:01 каждый день)но на этот раз мой QueryTable.class извлечет правильный запрос для выполнения (вместо SELECT *, я теперь указал нужные мне столбцы) и задаю it к телу обмена.

.to("jdbc:dataSourceInformix?resetAutoCommit=false&outputType=StreamList").split(body()).streaming()
.bean(TransformRecord.class, "process")

Компонент Camel jdbc примет запрос от тела обмена, установите для resetAutoCommit значение false, чтобы не выдавать ошибку Cursor not open, установитевывод для потоковой передачи и разделения потока выполнения, таким образом, я не буду запрашивать все записи сразу, а вместо этого одну за другой.Каждая извлеченная запись затем преобразуется в правильный POJO через TransformRecord.class.

.aggregate(constant(true), aggregationStrategyImpl)
.completionPredicate(aggregationStrategyImpl.getCompletionPredicate())
.completionTimeout(aggregationStrategyImpl.getCompletionTimeout())
.to("direct:start-processing")
.end();

На этот раз я использую компонент aggregate для создания списка записей.aggregationStrategyImpl содержит логику агрегации, а также предикат завершения и время ожидания, поэтому, когда я достигну определенного количества записей (или произойдет тайм-аут), список будет отправлен на «direct: start-processing».

Подробнее о реализации агрегации в этом источнике Союзники блог и в Apache Camel Aggregate EIP документы.

from("direct:start-processing")
.split(body()).executorService(customThreadPool.build(getContext()))
.bean(AnalyzeData.class, "analyze")
.bean(PersistData.class, "persist")
.end();

Здесь я разделю полученный список ииспользуя пользовательский ThreadPool, я создаю N потоков для анализа и обработки каждой записи в отдельности.Таким образом, я могу обрабатывать мой список параллельно, а не по одному.Я мог бы использовать .split(body()).parallelProcessing(), но настройки ThreadPool по умолчанию в дальнейшем могут быть неоптимальными.

Подробнее о реализации ThreadPool в Apache Camel Модель потоков docs, Конфигурация ThreadPool примечания и модель потоков Red Hat документы.

Решение 2:

Для этого решения это, в основном, то же самое выполнение, но со следующими изменениями:

// .to("direct:start-processing")
.to("seda:start-processing?size=1&blockWhenFull=true")
.end();
// from("direct:start-processing")
from("seda:start-processing?size=1&blockWhenFull=true")
// continues normally

Что бы это сделать, это отправить список процессам асинхронно , позволяя помещать в память до 1 другого списка в очередь и приостанавливать родительский поток, если очередь заполнена.Таким образом, вместо ожидания обработки списка записей родительский поток вернется и соберет другой пакет записей.Это также означает, что в случае, если маршрут обработки не завершен, новые записи не будут выброшены, и родительский поток будет ожидать, пока он не сможет отправить пакет в очередь SEDA в памяти.

Дополнительноо компоненте SEDA в документах Apache Camel SEDA Component в GitHub и на их сайте

Выводы:

С решением 1 должно потребоваться многодольше завершить, поскольку сначала обрабатываются все данные, а затем собирается больше записей из запроса, но потребление памяти должно быть на FAR меньше, поскольку оно контролируется в предикате агрегации.

В решении 2 это должно быть намного быстреепоскольку он собирает следующий пакет записей из запроса при обработке предыдущего пакета, но потребление памяти будет на порядок выше, поскольку он будет содержать не более 3 списков: один обрабатываемый, один в очереди SEDA и последний пакетсобирается родительским потоком (приостанавливается, когда очередь заполнена).

Я заявил, что все еще тестирую эти решения, потому что с 500k-записями это работает, но я все еще работаю над оптимальными настройками ThreadPool для сервера, на котором это будет реализовано. Я исследовал потоки в Java, но этокажется, что на самом деле не так уж много, кроме архитектуры системы, оперативной памяти и проб и ошибок.

...