Агрегация и корреляция верблюдов - PullRequest
0 голосов
/ 14 января 2019

У меня есть приложение Camel, которое создает извлечение CSV для нескольких учетных записей. Вот поток высокого уровня:

  1. Кварцевая запланированная работа запускает весь процесс
  2. Загрузите все учетные записи и для каждой учетной записи запустите задание, которое извлекает данные (с использованием параллельной обработки).
  3. Процесс извлечения выполняет 2 одновременных запроса (с использованием многоадресной рассылки / агрегатора), которые загружаются в базу данных в памяти
  4. Извлеките и создайте файл CSV

Вот маршруты верблюдов:

from("{{setting.job.scheduling}}")
        .log("JDS Reporting quartz job triggered")
        .split().method(AccountService.class,"getAccounts").parallelProcessing(true) //for each account run the extract job
        .to(ROUTE_URI);

from(ROUTE_URI).id(ROUTE_NAME)
        .log("Running JDS job processor")
        .to("bean:jobProcessor?method=process"); //create the job and kick off the reportProcessor

from("direct:reportProcessor").id(ROUTE_NAME)
    .multicast().parallelProcessing(true).aggregationStrategy(new MultiCastAggregation())
        .to("bean:inventoryProcessor?method=process") //query 1
        .to("bean:productProcessor?method=process") //query 2
    .end()
    .log("Processing complete for jobId: ${header.jobId}, inventoryLoad:${property."+Constants.INVENTORY_LOAD_DONE +"} && productLoad:${property."+Constants.PRODUCT_LOAD_DONE + "}")
    .choice()
        .when(simple("${exchangeProperty."+Constants.INVENTORY_LOAD_DONE +"} == true && ${exchangeProperty."+Constants.PRODUCT_LOAD_DONE + "} == true" ))
            .log("Processing complete for jobId: ${header.jobId}")
            .to("direct:loadDBToFile")
    .endChoice()
    .end();

Мой вопрос касается поведения агрегации на шаге 3 и как корреляция агрегации многоадресной рассылки. Я заметил, что aggregationStrategy иногда вызывается для двух разных заданий и перекрывается, что вызывает проблемы. Есть ли способ сопоставить многоадресную передачу () с помощью идентификатора?

Спасибо.

Код для MultiCastAggregation:

@Slf4j
public class MultiCastAggregation implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        } else {
            if (StringUtils.equals(oldExchange.getIn().getBody(String.class), newExchange.getIn().getBody(String.class))) {
                if (BooleanUtils.isTrue(oldExchange.getProperty(Constants.INVENTORY_LOAD_DONE, Boolean.class))
                    || BooleanUtils.isTrue(newExchange.getProperty(Constants.INVENTORY_LOAD_DONE, Boolean.class))
                    && BooleanUtils.isTrue((oldExchange.getProperty(Constants.PRODUCT_LOAD_DONE, Boolean.class))
                    || BooleanUtils.isTrue(newExchange.getProperty(Constants.PRODUCT_LOAD_DONE, Boolean.class)))) {
                    oldExchange.setProperty(Constants.INVENTORY_LOAD_DONE, true);
                    oldExchange.setProperty(Constants.PRODUCT_LOAD_DONE, true);
                }
            } else {
                log.error("Exchanges are for different JOBS!!!");
            }
            return oldExchange;
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...