Как использовать многопоточность внутри верблюжьего маршрута JPA - PullRequest
0 голосов
/ 14 января 2019

У нас есть импортер, работающий на мощном многоядерном сервере. Тем не менее, наши маршруты Apache Camel являются однопоточными, что является позором.

Наш импортер [верблюдов] - это программа для одного экземпляра. Как сделать так, чтобы конкретный маршрут обрабатывал сообщения, используя несколько потоков? Сообщения являются атомарными и обрабатываются компонентом, который уже делает это потокобезопасным способом. Я уже счастлив, если бы мог обрабатывать пакеты (maxMessagesPerPoll) в потоках и иметь время простоя до следующего опроса (в конце концов, это все же лучше, чем последовательная обработка).

Вот один из маршрутов, который я хотел бы сделать многопоточным:

public void onConfigure() throws Exception {
    // This is a JPA query which selects all unprocessed modules
    String query = RouteQueryHelper.selectNextUnprocessedStaged(ImportAction.IMPORT_MODULES);

    from("jpa:com.so.importer.entity.ModuleStageEntity" +
            "?consumer.query=" + query +
            "&maxMessagesPerPoll=2000" +
            "&consumeLockEntity=false" +
            "&consumer.delay=1000" +
            "&consumeDelete=false")

        .transacted().policy("CAMEL_DEFAULT_POLICY")

        .bean(moduleImportService) // processes the entity and updates it's status flag
        .to("log:import-module?groupInterval=10000")

        .routeId("so.route.import-module");
}

Маршрут имеет consumeDelete=false, потому что вместо этого мы используем свойство статуса для объекта (которое модифицируется и сохраняется). Свойство status также учитывается в consumer.query.

Мы используем верблюжий вариант 2.17.1 при весенней загрузке (1.3.8.RELEASE) на Java 8.

РЕДАКТИРОВАТЬ 2019-Jan-21: у сущностей есть метод с @Consumed, который толкает сущность на следующий маршрут после ее обработки:

@Consumed
public void gotoNextStatus() {
    switch (stageStatus) {
        case STAGED: setStageStatus(StageStatus.IMPORTED); break;
        case IMPORTED: setStageStatus(StageStatus.RENDERED); break;
        case RENDERED: setStageStatus(StageStatus.DONE); break;
    }
}

1 Ответ

0 голосов
/ 16 января 2019

Вы можете ввести некоторую асинхронизацию, отправив свои сообщения промежуточной конечной точке SEDA :

from("jpa:")
...
.to("seda:intermediateStage")

А затем поместите реальную обработку в новый маршрут с N одновременными потребителями SEDA (по умолчанию один):

from("seda:intermediateStage?concurrentConsumers=5")
.process(...)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...