Шлюз не устанавливает заголовок replyChannel - PullRequest
0 голосов
/ 06 июня 2018

В настоящее время я работаю над проектом, созданным с помощью Spring Integration 4.3.14, и мы решили попробовать использовать DSL, но у меня возникают проблемы при попытке интегрировать различные подпотоки.

У меня естьопределены следующие IntegrationFlow:

@Bean
public IntegrationFlow mainFlow() {
    return IntegrationFlows
            .from(
                    databaseSource(),
                    c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
            .split()
            .log()
            .gateway(f -> f
                            .transform(Transformer::transform)
                            .transform(AnotherTransformer::transform),
                    e -> e
                            .errorChannel("transformErrorChannel"))
            .gateway(f -> f
                            .<MyEntity>handle((p, h) -> this.doSomething(p))
                            .<MyEntity>handle((p, h) -> this.doOtherThing(p)),
                    e -> e
                            .errorChannel("doErrorChannel"))
            .channel("nullChannel")
            .get();
}

Все transform и handle вызванные методы являются недействительными и возвращают ненулевые значения.Основная причина, по которой мы выбрали этот подход, состоит в том, чтобы иметь два разных канала для обработки ошибок в зависимости от того, в какой части потока они произошли, поэтому мы можем действовать соответственно.

Тем не менее, когда я пытаюсь запустить этот код иЯ вставляю запись в БД, и поллер забирает ее, она никогда не выходит за пределы первого шлюза.У меня просто есть такие строки журнала:

2018-06-06 11:43:58.848  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.848  INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@1863292e
2018-06-06 11:43:58.864  INFO 6492 --- [ask-scheduler-1] c.e.transformation.Transformer           : Performing transformation.
2018-06-06 11:43:58.864  INFO 6492 --- [ask-scheduler-1] c.e.transformation.AnotherTransformer    : Performing another transformation. 
2018-06-06 11:43:58.848  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : started org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.944  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
2018-06-06 11:43:58.944  INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@433a796
2018-06-06 11:43:58.944  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : started org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f

Кажется очевидным, что сообщение действительно приходит на первый шлюз, но, очевидно, оно не передается на второй шлюз.

Во время запуска явидно, что SI создает два подпотока (# 0 и # 1) и два канала для каждого (по-моему, по одному для каждой операции) с 1 подписчиком на каждый.

Я также попытался изменить определение на следующее:

    @Bean
public IntegrationFlow getRecords() {
    return IntegrationFlows
            .from(
                    databaseSource(),
                    c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
            .split()
            .log()
            .gateway(f -> f
                            .transform(Transformer::transform)
                            .transform(AnotherTransformer::transform),
                    e -> e
                            .errorChannel("transformErrorChannel")
                            .replyChannel("doThingsChannel"))
            .get();
}

@Bean
public IntegrationFlow doThings() {
    return IntegrationFlows
            .from(
                    "doThingsChannel")
            .gateway(f -> f
                            .<MyEntity>handle((p, h) -> this.doSomehting(p))
                            .<MyEntity>handle((p, h) -> this.doOtherThing(p)),
                    e -> e
                            .errorChannel("doErrorChannel"))
            .get();
}

Но в итоге возникла та же проблема: либо установить replyChannel на GatewayEndpointSpec, либо добавить явный поток .channel в getRecords после шлюза.

1 Ответ

0 голосов
/ 06 июня 2018

Я только что выполнил этот тестовый пример в проекте Spring Integration Java DSL:

@Test
public void testGateways() {
    IntegrationFlow flow = f -> f
            .gateway(sf -> sf
                    .transform(p -> "foo#" + p)
                    .transform(p -> "bar#" + p))
            .gateway(sf -> sf
                    .handle((p, h) -> "handle1:" + p)
                    .handle((p, h) -> "handle2:" + p))
            .handle(System.out::println);

    IntegrationFlowRegistration flowRegistration = this.integrationFlowContext.registration(flow).register();

    flowRegistration.getInputChannel()
            .send(new GenericMessage<>("test"));

    flowRegistration.destroy();
}

Мой вывод выглядит так:

GenericMessage [payload=handle2:handle1:bar#foo#test, headers={id=ae09df5c-f63e-4b68-d73c-29b85f3689a8, timestamp=1528314852110}]

Итак, оба шлюза работают какОжидается, и все трансформаторы и обработчики применяются.Кроме того, результат последнего шлюза опрашивается в основном потоке для последнего шага System.out.

Не уверен, что происходит в вашем случае: только идея, что ваш .transform(AnotherTransformer::transform) не возвращает значение илитам происходит что-то еще.

Относительно опции replyChannel.Это не куда отправить результат шлюза.Вот где ждать ответа для возврата:

/**
 * Specify the channel from which reply messages will be received; overrides the
 * encompassing gateway's default reply channel.
 * @return the channel name.
 */
String replyChannel() default "";
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...