У меня есть приложение Camel, которое создает извлечение CSV для нескольких учетных записей. Вот поток высокого уровня:
- Кварцевая запланированная работа запускает весь процесс
- Загрузите все учетные записи и для каждой учетной записи запустите задание, которое извлекает данные (с использованием параллельной обработки).
- Процесс извлечения выполняет 2 одновременных запроса (с использованием многоадресной рассылки / агрегатора), которые загружаются в базу данных в памяти
- Извлеките и создайте файл CSV
Вот маршруты верблюдов:
from("{{setting.job.scheduling}}")
.log("JDS Reporting quartz job triggered")
.split().method(AccountService.class,"getAccounts").parallelProcessing(true) //for each account run the extract job
.to(ROUTE_URI);
from(ROUTE_URI).id(ROUTE_NAME)
.log("Running JDS job processor")
.to("bean:jobProcessor?method=process"); //create the job and kick off the reportProcessor
from("direct:reportProcessor").id(ROUTE_NAME)
.multicast().parallelProcessing(true).aggregationStrategy(new MultiCastAggregation())
.to("bean:inventoryProcessor?method=process") //query 1
.to("bean:productProcessor?method=process") //query 2
.end()
.log("Processing complete for jobId: ${header.jobId}, inventoryLoad:${property."+Constants.INVENTORY_LOAD_DONE +"} && productLoad:${property."+Constants.PRODUCT_LOAD_DONE + "}")
.choice()
.when(simple("${exchangeProperty."+Constants.INVENTORY_LOAD_DONE +"} == true && ${exchangeProperty."+Constants.PRODUCT_LOAD_DONE + "} == true" ))
.log("Processing complete for jobId: ${header.jobId}")
.to("direct:loadDBToFile")
.endChoice()
.end();
Мой вопрос касается поведения агрегации на шаге 3 и как корреляция агрегации многоадресной рассылки. Я заметил, что aggregationStrategy иногда вызывается для двух разных заданий и перекрывается, что вызывает проблемы. Есть ли способ сопоставить многоадресную передачу () с помощью идентификатора?
Спасибо.
Код для MultiCastAggregation:
@Slf4j
public class MultiCastAggregation implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
} else {
if (StringUtils.equals(oldExchange.getIn().getBody(String.class), newExchange.getIn().getBody(String.class))) {
if (BooleanUtils.isTrue(oldExchange.getProperty(Constants.INVENTORY_LOAD_DONE, Boolean.class))
|| BooleanUtils.isTrue(newExchange.getProperty(Constants.INVENTORY_LOAD_DONE, Boolean.class))
&& BooleanUtils.isTrue((oldExchange.getProperty(Constants.PRODUCT_LOAD_DONE, Boolean.class))
|| BooleanUtils.isTrue(newExchange.getProperty(Constants.PRODUCT_LOAD_DONE, Boolean.class)))) {
oldExchange.setProperty(Constants.INVENTORY_LOAD_DONE, true);
oldExchange.setProperty(Constants.PRODUCT_LOAD_DONE, true);
}
} else {
log.error("Exchanges are for different JOBS!!!");
}
return oldExchange;
}
}
}