параллельный поток-агрегатный поток с пружинной интеграцией завершается неудачно из-за односторонней передачи сообщений - PullRequest
0 голосов
/ 01 мая 2018

Я хочу обрабатывать список элементов параллельно, разделяя их, направляя каждый элемент на соответствующий шлюз и агрегируя результаты. Однако мое приложение не запускается, я получаю следующую ошибку:

BeanCreationException: The 'currentComponent' ... is a one-way 'MessageHandler' 
and it isn't appropriate to configure 'outputChannel'. 
This is the end of the integration flow.

Это примерное определение потока, которое иллюстрирует поведение:

@Bean
public IntegrationFlow parallelSplitRouteAggregateFlow() {
    return IntegrationFlows
            .from(Http.inboundGateway("/trigger"))
            .handle(message -> Arrays.asList(1, 2, 3))
            .split()
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .<Integer, Boolean>route(o -> o % 2 == 0, m -> m
                    .subFlowMapping(true, oddFlow())
                    .subFlowMapping(false, evenFlow()))
            .aggregate()
            .get();
}

@Bean
public IntegrationFlow oddFlow() {
    return flow -> flow.<Integer>handle((payload, headers) -> "odd");
}

@Bean
public IntegrationFlow evenFlow() {
    return flow -> flow.<Integer>handle((payload, headers) -> "even");
}

Я видел Ошибка »- это односторонний« MessageHandler »для агрегатора пружинной интеграции DSL , но решение здесь не применимо, я не регистрируюсь в методе handle (). Я также попытался добавить .defaultOutputToParentFlow () к mappingDefinition, потому что в примере с кафе его использует, но это тоже не имеет значения.

Я должен упомянуть, что это - весенняя интеграция 5.0.4 с выпуском spring-boot 2.0.1.

Ответы [ 3 ]

0 голосов
/ 01 мая 2018
@Bean
public IntegrationFlow parallelSplitRouteAggregateFlow() {
    return IntegrationFlows
            .from(Http.inboundGateway("/trigger"))
            .handle((p, h) -> Arrays.asList(1, 2, 3))
            .split()
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .<Integer, Boolean>route(o -> o % 2 == 0, m -> m
                    .subFlowMapping(true, oddFlow())
                    .subFlowMapping(false, evenFlow()))
            .get();
}

@Bean
public IntegrationFlow oddFlow() {
    return flow -> flow.<Integer>handle((payload, headers) -> "odd")
            .channel("agg.input");
}

@Bean
public IntegrationFlow evenFlow() {
    return flow -> flow.<Integer>handle((payload, headers) -> "even")
        .channel("agg.input");
}

@Bean
public IntegrationFlow agg() {
    return f -> f.aggregate();
}

enter image description here

0 голосов
/ 17 мая 2019

Если вам нужно «распределить» сообщения по нескольким «работникам» и вернуть сообщения в точку соединения, есть метод .scatterGather(...). По-видимому, он оборачивает функциональность .route(...) способом, более подходящим для использования в домене IntegrationFlow.

Это показано в следующем примере:

    @Bean
    public IntegrationFlow evenOddFlows() {
        return IntegrationFlows.from(Http.inboundGateway("/trigger"))
                .handle((payload,headers)->Arrays.asList(1,2,3))
                .split()
                .scatterGather(r->r.applySequence(true)
                .recipientFlow(m->(int)m.getPayload()%2==0, evenFlow-> evenFlow.log(m->"Even flow with payload: "+m.getPayload()).<Integer,Integer>transform(h-> h+50)
                        .handle((payload,headers)->(int)payload+50).log(m->"At Even flow end with payload: "+m.getPayload())
                        .handle((payload,headers)->payload) /* This .handle(...) doesn't do a real job.
* Instead, it is to patch something that at least I regard as a bug.
* Having not the .handle(...) would leave the .log(...) at the end of the flow.
* After crossing a .log(...) if right at the flow's end, the response message doesn't arrive back the parent flow (hence my aprising there is a bug).
* With the "appended" .handle(...) afterwards, avoid the .log(...) being the last one in the flow, as well as tests show the message is sent away where the parent flow receives it.
*/
                        )
                        .recipientFlow(m->(int)m.getPayload()%2!=0, oddFlow-> oddFlow.log(m->"Odd flow with payload: "+m.getPayload()).<Integer,Integer>transform(h-> h+10)
                        .handle((payload,headers)->(int)payload+10).log(m->"At Odd flow end with payload: "+m.getPayload())
                        .handle((payload,headers)->payload) // This .handle(...) I needed as a patch because otherwise the .log(...) being the last one in the subflow swallowed the message
                        )
                        )
                        .aggregate()
                        .get()
                ;
    }
curl -i -H "Content-type: application/json" http://localhost:8080/trigger

Curl выходы:

[[21],[102],[23]]

Журналы:

2019-05-17 16:19:11.061  INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler   : Odd flow with payload: 1
2019-05-17 16:19:11.061  INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler   : At Odd flow end with payload: 21
2019-05-17 16:19:11.061  INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler   : Even flow with payload: 2
2019-05-17 16:19:11.061  INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler   : At Even flow end with payload: 102
2019-05-17 16:19:11.061  INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler   : Odd flow with payload: 3
2019-05-17 16:19:11.061  INFO 10148 --- [nio-8080-exec-1] o.s.integration.handler.LoggingHandler   : At Odd flow end with payload: 23
0 голосов
/ 01 мая 2018

Ваша проблема здесь:

.handle(message -> Arrays.asList(1, 2, 3))

если бы вы использовали встроенную реализацию, она бы выглядела так:

.handle(new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            Arrays.asList(1, 2, 3);
        }
})

Обратите внимание на тип возврата void. Поскольку возвращать нечего, следовательно, отправлять нисходящий поток нечего - is a one-way 'MessageHandler'.

Чтобы решить вашу проблему, вам нужно сделать следующее:

 .handle((p, h) -> Arrays.asList(1, 2, 3))

что эквивалентно этому:

.handle(new GenericHandler<Object>() {

        @Override
        public Object handle(Object p, Map<String, Object> h) {
            return Arrays.asList(1, 2, 3);
        }
})

На самом деле моя ИДЕЯ говорит мне за ваш вариант вроде:

enter image description here

Это дает мне некоторую подсказку, что я делаю что-то не так.

UPDATE

Рабочий код:

@Bean
public IntegrationFlow parallelSplitRouteAggregateFlow() {
    return IntegrationFlows
            .from(Http.inboundGateway("/trigger"))
            .handle((p, h) -> Arrays.asList(1, 2, 3))
            .split()
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .<Integer, Boolean>route(o -> o % 2 == 0, m -> m
                    .subFlowMapping(true, sf -> sf.gateway(oddFlow()))
                    .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate()
            .get();
}

@Bean
public IntegrationFlow oddFlow() {
    return flow -> flow.<Integer>handle((payload, headers) -> "odd");
}

@Bean
public IntegrationFlow evenFlow() {
    return flow -> flow.<Integer>handle((payload, headers) -> "even");
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...