public static void main(String[] args) throws InterruptedException {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
ctx.register(Main.class);
ctx.refresh();
DirectChannel channel1 = ctx.getBean("channel1", DirectChannel.class);
ctx.getBean("channel2", PublishSubscribeChannel.class).subscribe(message ->
System.out.println("Output: " + message));
channel1.send(MessageBuilder.withPayload("p1")
.setHeader(CORRELATION_ID, 1)
.setHeader(SEQUENCE_SIZE,2)
.setHeader(SEQUENCE_NUMBER,1)
.setHeader("a", 1)
.build());
channel1.send(MessageBuilder.withPayload("p2")
.setHeader(CORRELATION_ID, 1)
.setHeader(SEQUENCE_SIZE,2)
.setHeader(SEQUENCE_NUMBER,2)
.setHeader("a", 2)
.build());
}
@Bean
public MessageChannel channel1() {
return MessageChannels.direct().get();
}
@Bean
public MessageChannel channel2() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public IntegrationFlow flow1() {
return IntegrationFlows
.from("channel1")
.aggregate(a -> a
.releaseStrategy(new SequenceSizeReleaseStrategy())
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true))
.channel("channel2")
.get();
}
Вывод: GenericMessage [payload = [p1, p2], заголовки = {sequenceNumber = 2, a = 2, correlationId = 1, id = b5e51041-c967-1bb4-1601-7e468ae28527, sequenceSize = 2, отметка времени = 1580475773518}]
Заголовки "a" и "sequenceNumber" были перезаписаны. Как агрегировать сообщения с одинаковыми заголовками?
Так и должно быть
Вывод: GenericMessage [payload = [p1, p2], заголовки = {sequenceNumber = [1,2] , a = [1, 2], correlationId = 1, id = b5e51041-c967-1bb4-1601-7e468ae28527, sequenceSize = 2, метка времени = 1580475773518}]