У меня есть поток интеграции, который выполняет операцию сбора разброса, которая затрагивает несколько конечных точек HTTP, возвращая JSON. Затем объединяет результат в один объект JSON. Поток идет так
@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.channel("myFlow.output");
}
Я запускаю поток, используя шлюз, объявленный следующим образом
@MessagingGateway
public interface IMyGateway {
@Gateway(requestChannel = "myFlow.input", replyChannel = "myFlow.output")
MyResult startFlow(@Payload String payload, @Header("header1") String header1, @Header("header2") String header2);
}
У меня проблема в том, что весь поток блокируется и время ожидания шлюза. Я установил точки останова в двух вызовах службы IMyService :: handleAggregatedJson и IMyOutherService :: handleMyServiceResult , и они оба работают, но вывод никогда не достигает канала ответа шлюза. Если я удаляю обе последние две операции handle , то поток обычно возвращает результат через шлюз.
Я изучил трассировку стека, пока поток заблокирован, и я вижу, что поток, выполняющий поток, ожидает блокировки
java.lang.Thread.State: WAITING at
sun.misc.Unsafe.park (Unsafe.java:-1) в
java.util.concurrent.locks.LockSupport.park (LockSupport.java:175) в
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt (AbstractQueuedSynchronizer.java:836)
в
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly (AbstractQueuedSynchronizer.java:997)
в
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly (AbstractQueuedSynchronizer.java:1304)
в java.util.concurrent.CountDownLatch.await (CountDownLatch.java:231)
в
org.springframework.messaging.core.GenericMessagingTemplate $ TemporaryReplyChannel.receive (GenericMessagingTemplate.java:308)
в
org.springframework.messaging.core.GenericMessagingTemplate $ TemporaryReplyChannel.receive (GenericMessagingTemplate.java:300)
в
org.springframework.messaging.core.GenericMessagingTemplate.doReceive (GenericMessagingTemplate.java:201)
в
org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive (GenericMessagingTemplate.java:234)
в
org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive (GenericMessagingTemplate.java:47)
в
org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive (AbstractMessagingTemplate.java:45)
в
org.springframework.integration.core.MessagingTemplate.sendAndReceive (MessagingTemplate.java:97)
в
org.springframework.integration.core.MessagingTemplate.sendAndReceive (MessagingTemplate.java:38)
в
org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive (AbstractMessagingTemplate.java:95)
в
org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive (AbstractMessagingTemplate.java:85)
в
org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive (MessagingGatewaySupport.java:487)
в
org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive (MessagingGatewaySupport.java:461)
в
org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod (GatewayProxyFactoryBean.java:520)
в
org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke (GatewayProxyFactoryBean.java:469)
в
org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke (GatewayProxyFactoryBean.java:460)
в
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed (ReflectiveMethodInvocation.java:185)
в
org.springframework.aop.framework.JdkDynamicAopProxy.invoke (JdkDynamicAopProxy.java:212)
at com.sun.proxy. $ Proxy116.startFlow (неизвестный источник: -1)
Из того, что я подозреваю, если поток занимает больше X времени, он заблокируется. Я попытался установить канал рандеву между потоком и шлюзом, но он не сработал.
Есть идеи о том, что вызывает проблему тайм-аута?
Добавление: я немного поигрался с кодом и удалил тип возврата на шлюзе, и последний вызов .channel в потоке, похоже, перестал его блокировать.
Следующее работает отлично
@Bean
public IntegrationFlow myFlow(IMyService myService, IMyOtherService myOtherService) {
return f -> f.enrichHeaders(eh -> eh.headerExpression(Headers.PAYLOAD, "payload"))
.handle(HeaderPrinter::headerPrinter)
.enrichHeaders(httpRequestHeaderEnricher())
.scatterGather(
scatterer -> scatterer.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint1"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.recipientFlow(sf -> sf.enrichHeaders(he -> he.header(Headers.DATA_ENDPOINT, "endpoint2"))
.handle(createOutboundHttpGateway(baseUrl, httpRequestFactory)))
.applySequence(true),
gatherer -> gatherer.outputProcessor(MyFlows::aggregateJsonFromMultipleSources)
)
.handle(myService, "handleAggregatedJson")
.handle(HeaderPrinter::headerPrinter)
.handle(myOtherService, "handleMyServiceOutput")
.handle(m -> {
log.info("Flow completed successfully, payload as expected:" + payload);
});
}