Spring Integration DSL ScatterGather потоковые блоки - PullRequest
0 голосов
/ 08 мая 2018

У меня есть поток интеграции, который выполняет операцию сбора разброса, которая затрагивает несколько конечных точек HTTP, возвращая JSON. Затем объединяет результат в один объект JSON. Поток идет так

@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
             .handle(HeaderPrinter::headerPrinter)
             .enrichHeaders(httpRequestHeaderEnricher())
             .scatterGather(
                scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .applySequence(true),
                gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
            )
            .handle(myService, "handleAggregatedJson")
            .handle(HeaderPrinter::headerPrinter)
            .handle(myOtherService, "handleMyServiceOutput")
            .channel("myFlow.output");
}

Я запускаю поток, используя шлюз, объявленный следующим образом

@MessagingGateway
public interface IMyGateway {

    @Gateway(requestChannel = "myFlow.input", replyChannel = "myFlow.output")
    MyResult startFlow(@Payload String payload, @Header("header1") String header1, @Header("header2") String header2);

}

У меня проблема в том, что весь поток блокируется и время ожидания шлюза. Я установил точки останова в двух вызовах службы IMyService :: handleAggregatedJson и IMyOutherService :: handleMyServiceResult , и они оба работают, но вывод никогда не достигает канала ответа шлюза. Если я удаляю обе последние две операции handle , то поток обычно возвращает результат через шлюз.

Я изучил трассировку стека, пока поток заблокирован, и я вижу, что поток, выполняющий поток, ожидает блокировки

java.lang.Thread.State: WAITING at sun.misc.Unsafe.park (Unsafe.java:-1) в java.util.concurrent.locks.LockSupport.park (LockSupport.java:175) в java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt (AbstractQueuedSynchronizer.java:836) в java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly (AbstractQueuedSynchronizer.java:997) в java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly (AbstractQueuedSynchronizer.java:1304) в java.util.concurrent.CountDownLatch.await (CountDownLatch.java:231) в org.springframework.messaging.core.GenericMessagingTemplate $ TemporaryReplyChannel.receive (GenericMessagingTemplate.java:308) в org.springframework.messaging.core.GenericMessagingTemplate $ TemporaryReplyChannel.receive (GenericMessagingTemplate.java:300) в org.springframework.messaging.core.GenericMessagingTemplate.doReceive (GenericMessagingTemplate.java:201) в org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive (GenericMessagingTemplate.java:234) в org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive (GenericMessagingTemplate.java:47) в org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive (AbstractMessagingTemplate.java:45) в org.springframework.integration.core.MessagingTemplate.sendAndReceive (MessagingTemplate.java:97) в org.springframework.integration.core.MessagingTemplate.sendAndReceive (MessagingTemplate.java:38) в org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive (AbstractMessagingTemplate.java:95) в org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive (AbstractMessagingTemplate.java:85) в org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive (MessagingGatewaySupport.java:487) в org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive (MessagingGatewaySupport.java:461) в org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod (GatewayProxyFactoryBean.java:520) в org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke (GatewayProxyFactoryBean.java:469) в org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke (GatewayProxyFactoryBean.java:460) в org.springframework.aop.framework.ReflectiveMethodInvocation.proceed (ReflectiveMethodInvocation.java:185) в org.springframework.aop.framework.JdkDynamicAopProxy.invoke (JdkDynamicAopProxy.java:212) at com.sun.proxy. $ Proxy116.startFlow (неизвестный источник: -1)

Из того, что я подозреваю, если поток занимает больше X времени, он заблокируется. Я попытался установить канал рандеву между потоком и шлюзом, но он не сработал.

Есть идеи о том, что вызывает проблему тайм-аута?

Добавление: я немного поигрался с кодом и удалил тип возврата на шлюзе, и последний вызов .channel в потоке, похоже, перестал его блокировать.

Следующее работает отлично

@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
             .handle(HeaderPrinter::headerPrinter)
             .enrichHeaders(httpRequestHeaderEnricher())
             .scatterGather(
                scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
                                                             .handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
                                      .applySequence(true),
                gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
            )
            .handle(myService, "handleAggregatedJson")
            .handle(HeaderPrinter::headerPrinter)
            .handle(myOtherService, "handleMyServiceOutput")
            .handle(m -> {
                log.info("Flow completed successfully, payload as expected:" + payload);
            });
}

1 Ответ

0 голосов
/ 08 мая 2018

Интересно, все ли твои

.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")

после сбора вернуть какое-то значение. Типичная ошибка с запросом-ответом, что какой-то шаг в потоке прекращает отвечать с некоторым разумным значением.

UPDATE

Следует рассмотреть возможность удаления явного объявления replyChannel из определения @Gateway, а также удаления .channel("myFlow.output") в конце потока. Таким образом, вы должны получить ответ на заголовок replyChannel. Когда вы настраиваете явный replyChannel, нет никакой гарантии, что у вас не будет другого подписчика на этот канал, который будет «красть» ваши ответные сообщения.

Подробнее см. В справочном руководстве .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...