В настоящее время я работаю над проектом, созданным с помощью Spring Integration 4.3.14, и мы решили попробовать использовать DSL, но у меня возникают проблемы при попытке интегрировать различные подпотоки.
У меня естьопределены следующие IntegrationFlow:
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows
.from(
databaseSource(),
c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
.split()
.log()
.gateway(f -> f
.transform(Transformer::transform)
.transform(AnotherTransformer::transform),
e -> e
.errorChannel("transformErrorChannel"))
.gateway(f -> f
.<MyEntity>handle((p, h) -> this.doSomething(p))
.<MyEntity>handle((p, h) -> this.doOtherThing(p)),
e -> e
.errorChannel("doErrorChannel"))
.channel("nullChannel")
.get();
}
Все transform
и handle
вызванные методы являются недействительными и возвращают ненулевые значения.Основная причина, по которой мы выбрали этот подход, состоит в том, чтобы иметь два разных канала для обработки ошибок в зависимости от того, в какой части потока они произошли, поэтому мы можем действовать соответственно.
Тем не менее, когда я пытаюсь запустить этот код иЯ вставляю запись в БД, и поллер забирает ее, она никогда не выходит за пределы первого шлюза.У меня просто есть такие строки журнала:
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@1863292e
2018-06-06 11:43:58.864 INFO 6492 --- [ask-scheduler-1] c.e.transformation.Transformer : Performing transformation.
2018-06-06 11:43:58.864 INFO 6492 --- [ask-scheduler-1] c.e.transformation.AnotherTransformer : Performing another transformation.
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : started org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@433a796
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : started org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
Кажется очевидным, что сообщение действительно приходит на первый шлюз, но, очевидно, оно не передается на второй шлюз.
Во время запуска явидно, что SI создает два подпотока (# 0 и # 1) и два канала для каждого (по-моему, по одному для каждой операции) с 1 подписчиком на каждый.
Я также попытался изменить определение на следующее:
@Bean
public IntegrationFlow getRecords() {
return IntegrationFlows
.from(
databaseSource(),
c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
.split()
.log()
.gateway(f -> f
.transform(Transformer::transform)
.transform(AnotherTransformer::transform),
e -> e
.errorChannel("transformErrorChannel")
.replyChannel("doThingsChannel"))
.get();
}
@Bean
public IntegrationFlow doThings() {
return IntegrationFlows
.from(
"doThingsChannel")
.gateway(f -> f
.<MyEntity>handle((p, h) -> this.doSomehting(p))
.<MyEntity>handle((p, h) -> this.doOtherThing(p)),
e -> e
.errorChannel("doErrorChannel"))
.get();
}
Но в итоге возникла та же проблема: либо установить replyChannel
на GatewayEndpointSpec
, либо добавить явный поток .channel
в getRecords
после шлюза.