Асинхронные разделенные / агрегированные шлюзы - PullRequest
0 голосов
/ 28 сентября 2018

Я пытаюсь создать рецепт асинхронной оркестрации с использованием шлюзов интеграции Spring (как входящих, так и исходящих).Посмотрев пример здесь , я попытался использовать scatter-collect следующим образом:

@Configuration
public class IntegrationComponents {

    @Value("${rest.endpoint.base}")
    private String endpointBase;

    @Bean
    public HttpRequestHandlingMessagingGateway inboundGateway() {
        return Http.inboundGateway("/test-inbound-gateway-resource")
            .requestMapping(mapping -> mapping.methods(HttpMethod.POST))
            .requestTimeout(3000)
            .replyTimeout(3000)
            .get();
    }

    @Bean
    public HttpRequestExecutingMessageHandler outboundGateway1() {
        return Http.outboundGateway(endpointBase + "/test-resource-1")
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(String.class)
            .get();
    }

    @Bean
    public HttpRequestExecutingMessageHandler outboundGateway2() {
        return Http.outboundGateway(endpointBase + "/test-resource-2")
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(String.class)
            .get();
    }

    @Bean
    public StandardIntegrationFlow integrationFlow() {
        ExecutorService executor = Executors.newCachedThreadPool();

        IntegrationFlow flow1 = IntegrationFlows.from(MessageChannels.executor(executor))
            .handle(outboundGateway1())
            .get();

        IntegrationFlow flow2 = IntegrationFlows.from(MessageChannels.executor(executor))
            .handle(outboundGateway2())
            .get();

        return IntegrationFlows
            .from(inboundGateway())
            .transform(String.class, String::toUpperCase)
            .channel(MessageChannels.executor(executor))
            .scatterGather(
                    scatterer -> scatterer
                        .applySequence(true)
                        .recipientFlow(flow1)
                        .recipientFlow(flow2),
                    gatherer -> gatherer
                        .outputProcessor(messageGroup -> {
                            List<Message<?>> list = new ArrayList<>(messageGroup.getMessages());

                            String payload1 = (String) list.get(0).getPayload();
                            String payload2 = (String) list.get(1).getPayload();

                            return MessageBuilder.withPayload(payload1 + "+" + payload2).build();
                        }))
            .get();
    }
}

Это выполняется, но мои полезные данные меняются местами, потому что в этом случае outboundGateway1 выполняется дольше, чем outboundGateway2,Сначала идет полезная нагрузка 2, а затем полезная нагрузка 1.

Есть ли способ указать сборке разброса для определения / поддержания порядка при отправке на выходной процессор?

На аналогичной ноте, может быть, лучше разделить / объединить и / или использовать маршрутизатор?Но если это так, как бы это выглядело?

Я попробовал следующее разделение / маршрут / агрегат, но он не смог сказать "The currentComponent" (org.springframework.integration.router.RecipientListRouter@b016b4e) является односторонним MessageHandler и не подходит для настройки outputChannel. Это конец потока интеграции. ":

@Configuration
public class IntegrationComponents {

    @Value("${rest.endpoint.base}")
    private String endpointBase;

    @Bean
    public HttpRequestHandlingMessagingGateway inboundGateway() {
        return Http.inboundGateway("/test-inbound-gateway-resource")
            .requestMapping(mapping -> mapping.methods(HttpMethod.POST))
            .requestTimeout(3000)
            .replyTimeout(3000)
            .get();
    }

    @Bean
    public HttpRequestExecutingMessageHandler outboundGateway1() {
        return Http.outboundGateway(endpointBase + "/test-resource-1")
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(String.class)
            .get();
    }

    @Bean
    public HttpRequestExecutingMessageHandler outboundGateway2() {
        return Http.outboundGateway(endpointBase + "/test-resource-2")
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(String.class)
            .get();
    }

    @Bean
    public StandardIntegrationFlow integrationFlow() {
        ExecutorService executor = Executors.newCachedThreadPool();

        IntegrationFlow flow1 = IntegrationFlows.from(MessageChannels.executor(executor))
            .handle(outboundGateway1())
            .get();

        IntegrationFlow flow2 = IntegrationFlows.from(MessageChannels.executor(executor))
            .handle(outboundGateway2())
            .get();

        return IntegrationFlows
            .from(inboundGateway())
            .transform(String.class, String::toUpperCase)
            .split()
            .channel(MessageChannels.executor(executor))
            .routeToRecipients(r -> r
                .recipientFlow(flow1)
                .recipientFlow(flow2))
            .aggregate()
            .get();
    }
}

1 Ответ

0 голосов
/ 01 октября 2018

Разве вы не можете просто Collections.sort() список в выходном процессоре?Каждое сообщение будет иметь заголовок IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, так как вы установите applySequence.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...