Я новичок в Spring Integration и пытаюсь использовать корпоративный шаблон scatter-collect , но я борюсь с деталями реализации и с доступными примерами, которые я могу найти в Интернете.
Короче говоря, мой сценарий:
- HTTP-запрос от пользователя отправляется в систему A.
- Перед тем как ответить (он же синхронный), система A отправляет N число сообщения для N числа систем X, асинхронно.
- Система A сидит и ждет ответов.
- После получения ответа от каждой из систем запросов система A объединяет ответы в одну большую ответ.
- Система A, наконец, отвечает пользователю более широким ответом.
По сути, что касается первоначального потребителя, единственным является запрос, который отвечает ответом, без необходимости «вернуться позже». Тем не менее, этот запрос был фактически к фасаду, который маскирует сложность, которая лежит за ним (потенциально поражая сотни систем, делая синхронные запросы на внутреннем сервере неработоспособными и неосуществимыми).
Пока у меня есть эта реализация (вычеркнутые детали могут быть не 1: 1 примером того, с чем я играю, например, разработанная мной CorlationStrategy не выполняет то, что я ожидал):
@Bean
public IntegrationFlow overallRequest(final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from( // HTTP endpoint to user makes requests on
Http.inboundChannelAdapter("/request-overall-document")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.log()
// Arbitrary header to simplify example, realistically would generate a UUID
// and attach to some correlating header that works for systems involved
.enrichHeaders(p -> p.header("someHeader", "someValue"))
.log()
.scatterGather(
recipientListRouterSpec ->
recipientListRouterSpec
.applySequence(true)
.recipientFlow(
flow ->
flow.handle( // Straight pass through of msg received to see in response
Amqp.outboundAdapter(amqpTemplate)
.exchangeName( // RabbitMQ fanout exchange to N queues to N systems
"request-overall-document-exchange"))),
aggregatorSpec ->
aggregatorSpec
// Again for example, arbitrary once two correlated responses
.correlationStrategy(msg -> msg.getHeaders().get("someHeader"))
.releaseStrategy(gm -> gm.size() == 2)
// Simple string concatenation for overall response
.outputProcessor(
msgrp ->
msgrp.getMessages().stream()
.map(msg -> msg.getPayload().toString())
.reduce("Overall response: ", (nexus, txt) -> nexus + "|" + txt))
// Reset group on each response
.expireGroupsUponCompletion(true),
scatterGatherSpec ->
scatterGatherSpec.gatherChannel(
responseChannel())) // The channel to listen for responses to request on
.log()
.get();
}
С это как конфигурация канала ответа:
@Bean
public MessageChannel responseChannel() {
return new QueueChannel();
}
@Bean
public AmqpInboundChannelAdapter responseChannelAdapter(
SimpleMessageListenerContainer listenerContainer,
@Qualifier("responseChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer responseContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("request-overall-document-responses");
return container;
}
Со всеми ответами, отправляемыми в отдельное Spring-приложение, которое просто перенаправляет полезные данные запроса обратно (то есть для тестирования без необходимости интеграции с фактическим системы):
@Bean
public IntegrationFlow systemOneReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-1"))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
.get();
}
@Bean
public IntegrationFlow systemTwoReception(final ConnectionFactory connectionFactory, final AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "request-overall-document-system-2"))
.log()
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("request-overall-document-responses"))
.get();
}
И я получаю следующую ошибку в системе A при успешном выпуске в соответствии со стратегией агрегации / выпуска в реализации рассеяния-сбора:
2020-02-29 20:06:39.255 ERROR 152 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |somerequesttobesent|somerequesttobesent, headers={amqp_receivedDeliveryMode=PERSISTENT, content-length=19, amqp_deliveryTag=2, sequenceSize=1, amqp_redelivered=false, amqp_contentEncoding=UTF-8, host=localhost:18081, someHeader=someValue, connection=keep-alive, correlationId=182ee203-85ab-9ef6-7b19-3a8e2da8f5a7, id=994a0cf5-ad2b-02c3-dc93-74fae2f5092b, cache-control=no-cache, contentType=text/plain, timestamp=1583006799252, http_requestMethod=POST, sequenceNumber=1, amqp_consumerQueue=request-overall-document-responses, accept=*/*, amqp_receivedRoutingKey=request-overall-document-responses, amqp_timestamp=Sat Feb 29 20:06:39 GMT 2020, amqp_messageId=3341deae-7ed0-a042-0bb7-d2d2be871165, http_requestUrl=http://localhost:18081/request-overall-document, amqp_consumerTag=amq.ctag-ULxwuAjp8ZzcopBZYvcbZQ, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.22.0}]
at org.springframework.integration.scattergather.ScatterGatherHandler.lambda$doInit$2(ScatterGatherHandler.java:160)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:77)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:71)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:823)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:475)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Теперь я понимаю У меня есть несколько пробелов, но я изо всех сил пытаюсь понять, как двигаться вперед:
- Данная ошибка: это отсутствует какой-либо вывод команды collectResultChannel. Я бы подумал, что это были бы последующие «дескрипторы» / «логи» / мы в результате вызова scatterGather (...), но не так, чтобы поиграть.
- Должна существовать некоторая форма отображения от результата агрегации рассеяния до исходного запроса Http.XXX.
EDIT : от Дальнейшее копание приводит к тому, что проблема возникает из-за того, что при выходе через AMQP (в моем случае RabbitMQ) рассматриваемый заголовок намеренно отбрасывается , поскольку это MessageChannel (см. строки 230–257) . Не уверен, что подразумевается, что разделение / агрегирование не предназначено для пересечения между несколькими независимыми приложениями (я предполагаю, что оно отброшено, потому что это экземпляр объекта Java, который было бы проблематично c передать). ..
ДАЛЬНЕЙШЕЕ РЕДАКТИРОВАНИЕ : глаза fre sh заметили то, чего раньше не было, исключение, которое я вставил в кавычки неудачного сообщения, и это, кажется, явный результат обработка вывода (при переключении между DirectChannel и QueueChannel только DirectChannel не печатает полезную нагрузку, поэтому не искал ее). Чтобы быть уверенным, что он не выполнял какое-либо клонирование или что-то странное, обновил службу-заглушку для преобразования и добавления уникальных постфиксов (как показано ниже), и да, это на самом деле агрегирование.
.transform(msg -> MessageFormat.format("{0}_system1response", msg))
.transform(msg -> MessageFormat.format("{0}_system2response", msg))
The 'gatherResultChannel' header is required to deliver the gather result., failedMessage=GenericMessage [payload=Overall response: |sometext_system2response|sometext_system1response, hea...
Так что это похоже на рассеяние , сбор и агрегация - все работает, единственное, что не является тем, что данная обработка не знает, куда отправлять sh сообщения после этого?
ОДИН РАЗ БОЛЬШЕ: Согласно В ответ Гари заменил все адаптеры шлюзами, но при этом больше не может разветвляться? Поэтому удалил аргумент scatterGatherSpe c из вызова scatterGather и заменил / добавил два получателя следующим образом:
.recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-1"), e -> e.id("sytemOneOutboundGateway")))
.recipientFlow(flow -> flow.handle(Amqp.asyncOutboundGateway(asyncTemplate).routingKeyFunction(m -> "request-overall-document-system-2"), e -> e.id("sytemTwoOutboundGateway")))
, что, однако, является наиболее близким к рабочему примеру, хотя это работы, это приводит к повторной обработке сообщений многократных очередей включения / выключения, где мой ожидаемый результат для POST с 'msgtosend' был бы:
Overall message: |msgtosend_system1response|msgtosend_system2response
Вместо этого я получаю sporadi c выходные данные, такие как:
Overall message: |msgtosend|msgtosend_system1response
Overall message: |msgtosend_system2response|msgtosend_system1response_system1response
Overall message: |msgtosend|msgtosend_system1response_system1response
Overall message: |msgtosend_system2response|msgtosend_system1response_system1response
Я предполагаю, что есть некоторое перекрытие config / bean, но попытайтесь, как я мог, я не могу изолировать, что это такое, то есть фабрика соединений, контейнер слушателя , asyn c template, et c. и др c.