Мы используем приведенный ниже код для агрегации, и мы заметили, что сообщения не передаются с перерывами в последующий поток.Мы включили журнал трассировки для агрегатного пакета
IntegrationFlows
.from("upstream")
.log(INFO, g -> "Message Received for Aggregation: " + g.getPayload())
.aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy( m -> 1)
.expireGroupsUponCompletion(true)
.expireGroupsUponTimeout(true)
.groupTimeout(30000)
.sendPartialResultOnExpiry(true)
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(100, 30000)))
.log(INFO, g -> "Message released:" + ((ArrayList) g.getPayload()).size())
.handle(someService)
.get();
Этот журнал показывает, что сообщение никогда не завершалось агрегатной функцией
2019-02-20 16:53:44,366 UTC INFO [org.springframework.jms.listener.DefaultMessageListenerContainer#0-4] org.springframework.integration.handler.LoggingHandler- Message Received for Aggregation: Message1
2019-02-20 16:53:44,366 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-4] org.springframework.integration.aggregator.AggregatingMessageHandler- org.springframework.integration.aggregator.AggregatingMessageHandler#0 received message: GenericMessage [payload=Message1, headers={jms headers}]
2019-02-20 16:53:44,366 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-4] org.springframework.integration.aggregator.AggregatingMessageHandler- Handling message with correlationKey [1]: GenericMessage [payload=Message1, headers={jms headers}]
2019-02-20 16:53:44,367 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-4] org.springframework.integration.aggregator.AggregatingMessageHandler- Schedule MessageGroup [ SimpleMessageGroup{groupId=1, messages=[GenericMessage [payload=Message1, headers={jms headers}] to 'forceComplete'.
Этот журнал показывает, что сообщение было завершено агрегатной функцией
2019-02-20 16:58:15,386 UTC INFO [org.springframework.jms.listener.DefaultMessageListenerContainer#0-3] org.springframework.integration.handler.LoggingHandler- Message Received for Aggregation: Message2
2019-02-20 16:58:15,386 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-3] org.springframework.integration.aggregator.AggregatingMessageHandler- org.springframework.integration.aggregator.AggregatingMessageHandler#0 received message: GenericMessage [payload=Message2, headers={jms headers}]
2019-02-20 16:58:15,386 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-3] org.springframework.integration.aggregator.AggregatingMessageHandler- Handling message with correlationKey [1]: GenericMessage [payload=Message2, headers={jms headers}]
2019-02-20 16:58:15,386 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-3] org.springframework.integration.aggregator.AggregatingMessageHandler- Schedule MessageGroup [ SimpleMessageGroup{groupId=1, messages=[GenericMessage [payload=Message2, headers={jms headers}] to 'forceComplete'.
2019-02-20 16:58:45,387 UTC DEBUG [task-scheduler-6] org.springframework.integration.aggregator.AggregatingMessageHandler- Cancel 'forceComplete' scheduling for MessageGroup [ SimpleMessageGroup{groupId=1, messages=[GenericMessage [payload=Message2, headers={jms headers}].
2019-02-20 16:58:45,387 UTC DEBUG [task-scheduler-6] org.springframework.integration.aggregator.AggregatingMessageHandler- Completing group with correlationKey [1]
2019-02-20 16:58:45,387 UTC INFO [task-scheduler-6] org.springframework.integration.handler.LoggingHandler- Message released: 1
Можете ли вы помочь, что отсутствует в коде