См. документацию об агрегаторе .
Агрегатор коррелирует сообщения, используя некоторую стратегию корреляции, и освобождает группу на основе некоторой стратегии выпуска.
Агрегатор объединяет группу связанных сообщений, сопоставляя и сохраняя их, пока группа не будет считаться завершенной. В этот момент агрегатор создает одно сообщение, обрабатывая всю группу, и отправляет агрегированное сообщение в качестве вывода.
Выходная полезная нагрузка - это список сгруппированных полезных нагрузок по умолчанию, но вы можете предоставить собственный обработчик вывода.
EDIT
@SpringBootApplication
public class So55299268Application {
public static void main(String[] args) {
SpringApplication.run(So55299268Application.class, args);
}
@Bean
public IntegrationFlow in1(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("queue1"))
.channel("aggregator.input")
.get();
}
@Bean
public IntegrationFlow in2(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination("queue2"))
.channel("aggregator.input")
.get();
}
@Bean
public IntegrationFlow aggregator() {
return f -> f
.aggregate(a -> a
.correlationExpression("headers.jms_correlationId")
.releaseExpression("size() == 2")
.expireGroupsUponCompletion(true)
.expireGroupsUponTimeout(true)
.groupTimeout(5_000L)
.discardChannel("discards.input"))
.handle(System.out::println);
}
@Bean
public IntegrationFlow discards() {
return f -> f.handle((p, h) -> {
System.out.println("Aggregation timed out for " + p);
return null;
});
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
send(template, "one", "two");
send(template, "three", null);
};
}
private void send(JmsTemplate template, String one, String two) {
template.convertAndSend("queue1", one, m -> {
m.setJMSCorrelationID(one);
return m;
});
if (two != null) {
template.convertAndSend("queue2", two, m -> {
m.setJMSCorrelationID(one);
return m;
});
}
}
}
и
GenericMessage [payload = [two, one], заголовки = {jms_redelivered = false, jms_destination = queue: // queue1, jms_correlationId = one, id = 784535fe-8861-1b22-2cfa-cc2e67763674, приоритет = 4, jms_timestamp = 1553290921442, jms_messageId = ID: Gollum2.local-55540-1553290921241-4: 1: 3: 1: 1, отметка времени = 1553290921457}]
2019-03-22 17: 42: 06.460 INFO 55396 --- [ask-scheduler-1] o.s.i.a.AggregatingMessageHandler: группа сообщений с истекающим сроком действия с корреляциейKey [three]
Время агрегации истекло в течение трех