У нас есть импортер, работающий на мощном многоядерном сервере. Тем не менее, наши маршруты Apache Camel являются однопоточными, что является позором.
Наш импортер [верблюдов] - это программа для одного экземпляра. Как сделать так, чтобы конкретный маршрут обрабатывал сообщения, используя несколько потоков? Сообщения являются атомарными и обрабатываются компонентом, который уже делает это потокобезопасным способом.
Я уже счастлив, если бы мог обрабатывать пакеты (maxMessagesPerPoll) в потоках и иметь время простоя до следующего опроса (в конце концов, это все же лучше, чем последовательная обработка).
Вот один из маршрутов, который я хотел бы сделать многопоточным:
public void onConfigure() throws Exception {
// This is a JPA query which selects all unprocessed modules
String query = RouteQueryHelper.selectNextUnprocessedStaged(ImportAction.IMPORT_MODULES);
from("jpa:com.so.importer.entity.ModuleStageEntity" +
"?consumer.query=" + query +
"&maxMessagesPerPoll=2000" +
"&consumeLockEntity=false" +
"&consumer.delay=1000" +
"&consumeDelete=false")
.transacted().policy("CAMEL_DEFAULT_POLICY")
.bean(moduleImportService) // processes the entity and updates it's status flag
.to("log:import-module?groupInterval=10000")
.routeId("so.route.import-module");
}
Маршрут имеет consumeDelete=false
, потому что вместо этого мы используем свойство статуса для объекта (которое модифицируется и сохраняется). Свойство status также учитывается в consumer.query
.
Мы используем верблюжий вариант 2.17.1 при весенней загрузке (1.3.8.RELEASE) на Java 8.
РЕДАКТИРОВАТЬ 2019-Jan-21: у сущностей есть метод с @Consumed, который толкает сущность на следующий маршрут после ее обработки:
@Consumed
public void gotoNextStatus() {
switch (stageStatus) {
case STAGED: setStageStatus(StageStatus.IMPORTED); break;
case IMPORTED: setStageStatus(StageStatus.RENDERED); break;
case RENDERED: setStageStatus(StageStatus.DONE); break;
}
}