Я пытаюсь создать рецепт асинхронной оркестрации с использованием шлюзов интеграции Spring (как входящих, так и исходящих).Посмотрев пример здесь , я попытался использовать scatter-collect следующим образом:
@Configuration
public class IntegrationComponents {
@Value("${rest.endpoint.base}")
private String endpointBase;
@Bean
public HttpRequestHandlingMessagingGateway inboundGateway() {
return Http.inboundGateway("/test-inbound-gateway-resource")
.requestMapping(mapping -> mapping.methods(HttpMethod.POST))
.requestTimeout(3000)
.replyTimeout(3000)
.get();
}
@Bean
public HttpRequestExecutingMessageHandler outboundGateway1() {
return Http.outboundGateway(endpointBase + "/test-resource-1")
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.get();
}
@Bean
public HttpRequestExecutingMessageHandler outboundGateway2() {
return Http.outboundGateway(endpointBase + "/test-resource-2")
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.get();
}
@Bean
public StandardIntegrationFlow integrationFlow() {
ExecutorService executor = Executors.newCachedThreadPool();
IntegrationFlow flow1 = IntegrationFlows.from(MessageChannels.executor(executor))
.handle(outboundGateway1())
.get();
IntegrationFlow flow2 = IntegrationFlows.from(MessageChannels.executor(executor))
.handle(outboundGateway2())
.get();
return IntegrationFlows
.from(inboundGateway())
.transform(String.class, String::toUpperCase)
.channel(MessageChannels.executor(executor))
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(flow1)
.recipientFlow(flow2),
gatherer -> gatherer
.outputProcessor(messageGroup -> {
List<Message<?>> list = new ArrayList<>(messageGroup.getMessages());
String payload1 = (String) list.get(0).getPayload();
String payload2 = (String) list.get(1).getPayload();
return MessageBuilder.withPayload(payload1 + "+" + payload2).build();
}))
.get();
}
}
Это выполняется, но мои полезные данные меняются местами, потому что в этом случае outboundGateway1 выполняется дольше, чем outboundGateway2,Сначала идет полезная нагрузка 2, а затем полезная нагрузка 1.
Есть ли способ указать сборке разброса для определения / поддержания порядка при отправке на выходной процессор?
На аналогичной ноте, может быть, лучше разделить / объединить и / или использовать маршрутизатор?Но если это так, как бы это выглядело?
Я попробовал следующее разделение / маршрут / агрегат, но он не смог сказать "The currentComponent" (org.springframework.integration.router.RecipientListRouter@b016b4e) является односторонним MessageHandler и не подходит для настройки outputChannel. Это конец потока интеграции. ":
@Configuration
public class IntegrationComponents {
@Value("${rest.endpoint.base}")
private String endpointBase;
@Bean
public HttpRequestHandlingMessagingGateway inboundGateway() {
return Http.inboundGateway("/test-inbound-gateway-resource")
.requestMapping(mapping -> mapping.methods(HttpMethod.POST))
.requestTimeout(3000)
.replyTimeout(3000)
.get();
}
@Bean
public HttpRequestExecutingMessageHandler outboundGateway1() {
return Http.outboundGateway(endpointBase + "/test-resource-1")
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.get();
}
@Bean
public HttpRequestExecutingMessageHandler outboundGateway2() {
return Http.outboundGateway(endpointBase + "/test-resource-2")
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.get();
}
@Bean
public StandardIntegrationFlow integrationFlow() {
ExecutorService executor = Executors.newCachedThreadPool();
IntegrationFlow flow1 = IntegrationFlows.from(MessageChannels.executor(executor))
.handle(outboundGateway1())
.get();
IntegrationFlow flow2 = IntegrationFlows.from(MessageChannels.executor(executor))
.handle(outboundGateway2())
.get();
return IntegrationFlows
.from(inboundGateway())
.transform(String.class, String::toUpperCase)
.split()
.channel(MessageChannels.executor(executor))
.routeToRecipients(r -> r
.recipientFlow(flow1)
.recipientFlow(flow2))
.aggregate()
.get();
}
}