Spring интеграции DSL агрегатор не выпускает сообщения - PullRequest
0 голосов
/ 21 февраля 2019

Мы используем приведенный ниже код для агрегации, и мы заметили, что сообщения не передаются с перерывами в последующий поток.Мы включили журнал трассировки для агрегатного пакета

IntegrationFlows
                .from("upstream")
                .log(INFO, g -> "Message Received for Aggregation: "  + g.getPayload())
                .aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy( m -> 1)
                        .expireGroupsUponCompletion(true)
                        .expireGroupsUponTimeout(true)
                        .groupTimeout(30000)
                        .sendPartialResultOnExpiry(true)
                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(100, 30000)))
                .log(INFO, g -> "Message released:"  + ((ArrayList) g.getPayload()).size())
                .handle(someService)
                .get();

Этот журнал показывает, что сообщение никогда не завершалось агрегатной функцией

2019-02-20 16:53:44,366 UTC INFO  [org.springframework.jms.listener.DefaultMessageListenerContainer#0-4] org.springframework.integration.handler.LoggingHandler- Message Received for Aggregation: Message1
2019-02-20 16:53:44,366 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-4] org.springframework.integration.aggregator.AggregatingMessageHandler- org.springframework.integration.aggregator.AggregatingMessageHandler#0 received message: GenericMessage [payload=Message1, headers={jms headers}]
2019-02-20 16:53:44,366 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-4] org.springframework.integration.aggregator.AggregatingMessageHandler- Handling message with correlationKey [1]: GenericMessage [payload=Message1, headers={jms headers}]
2019-02-20 16:53:44,367 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-4] org.springframework.integration.aggregator.AggregatingMessageHandler- Schedule MessageGroup [ SimpleMessageGroup{groupId=1, messages=[GenericMessage [payload=Message1, headers={jms headers}] to 'forceComplete'.

Этот журнал показывает, что сообщение было завершено агрегатной функцией

2019-02-20 16:58:15,386 UTC INFO  [org.springframework.jms.listener.DefaultMessageListenerContainer#0-3] org.springframework.integration.handler.LoggingHandler- Message Received for Aggregation: Message2
2019-02-20 16:58:15,386 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-3] org.springframework.integration.aggregator.AggregatingMessageHandler- org.springframework.integration.aggregator.AggregatingMessageHandler#0 received message: GenericMessage [payload=Message2, headers={jms headers}]
2019-02-20 16:58:15,386 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-3] org.springframework.integration.aggregator.AggregatingMessageHandler- Handling message with correlationKey [1]: GenericMessage [payload=Message2, headers={jms headers}]
2019-02-20 16:58:15,386 UTC DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer#0-3] org.springframework.integration.aggregator.AggregatingMessageHandler- Schedule MessageGroup [ SimpleMessageGroup{groupId=1, messages=[GenericMessage [payload=Message2, headers={jms headers}] to 'forceComplete'.
2019-02-20 16:58:45,387 UTC DEBUG [task-scheduler-6] org.springframework.integration.aggregator.AggregatingMessageHandler- Cancel 'forceComplete' scheduling for MessageGroup [ SimpleMessageGroup{groupId=1, messages=[GenericMessage [payload=Message2, headers={jms headers}].
2019-02-20 16:58:45,387 UTC DEBUG [task-scheduler-6] org.springframework.integration.aggregator.AggregatingMessageHandler- Completing group with correlationKey [1]
2019-02-20 16:58:45,387 UTC INFO  [task-scheduler-6] org.springframework.integration.handler.LoggingHandler- Message released: 1

Можете ли вы помочь, что отсутствует в коде

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...