Spring-Integration: отправка сообщений в очереди для выборочного потребителя - PullRequest
0 голосов
/ 29 мая 2018

У меня есть пружинный интеграционный поток, который генерирует сообщения, которые следует хранить, ожидая, пока соответствующий потребитель придет и потребит их.

@Bean
public IntegrationFlow messagesPerCustomerFlow() {
    return IntegrationFlows.
            from(WebFlux.inboundChannelAdapter("/messages/{customer}")
                    .requestMapping(r -> r
                            .methods(HttpMethod.POST)
                    )
                    .requestPayloadType(JsonNode.class)
                    .headerExpression("customer", "#pathVariables.customer")
            )
            .channel(messagesPerCustomerQueue()) 
            .get();
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(100);
}

@Bean
public QueueChannel messagesPerCustomerQueue() {
    return MessageChannels.queue()
            .get();
}

Сообщения в очереди должны доставляться как отправленные серверомсобытия через http, как показано ниже.

PublisherSubscription является просто держателем для Publisher и IntegrationFlowRegistration, последнее используется для уничтожения динамически создаваемого потока, когда он больше не нужен (обратите внимание, что входящее сообщение дляGET не имеет содержимого, которое не обрабатывается должным образом ATM посредством интеграции Webflux, поэтому для получения доступа к переменной пути, помещенной в заголовок customer, требуется небольшой обходной путь:

@Bean
public IntegrationFlow eventMessagesPerCustomer() {
    return IntegrationFlows
       .from(WebFlux.inboundGateway("/events/{customer}")
            .requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
            .headerExpression("customer", "#pathVariables.customer")
            .payloadExpression("''") // neeeded to make handle((p,h) work
       )
       .log()
       .handle((p, h) -> {
           String customer = h.get("customer").toString();
           PublisherSubscription<JsonNode> publisherSubscription =
               subscribeToMessagesPerCustomer(customer);
           return Flux.from(publisherSubscription.getPublisher())
                   .map(Message::getPayload)
                   .doFinally(signalType ->
                      publisherSubscription.unsubscribe());
       })
       .get();
}

Вышеупомянутый запросдля событий, отправляемых сервером, динамически регистрируется поток, который подписывается на канал очереди по требованию с избирательным потребителем , реализованным фильтром с throwExceptionOnRejection(true).Следуя спецификации для Цепочка обработчиков сообщений , которая должна гарантировать, что сообщение предлагается всем потребителям, пока один не примет его.

public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
    IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
            .filter("headers.customer=='" + customer + "'",
                    filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
    Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();

    IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
            .register();

    return new PublisherSubscription<>(messagePublisher, registration);
}

Эта конструкция работает в принципе, но со следующими проблемами:

  • Сообщения, отправленные в очередь, когда подписчиков вообще нет, ведут к MessageDeliveryException: Dispatcher has no subscribers for channel 'application.messagesPerCustomerQueue'
  • Сообщения, отправленные в очередь, пока нет соответствующих подписчиков, все же приводят к AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed.

Я хочу, чтобы сообщение оставалось в очереди и неоднократно предлагалось всем подписчикам до тех пор, пока оно не будет использовано или истечет срок годности (правильный выборочный потребитель).Как я могу это сделать?

1 Ответ

0 голосов
/ 29 мая 2018

обратите внимание, что входящее сообщение для GET не имеет содержимого, которое не обрабатывается должным образом ATM посредством интеграции Webflux

Я не понимаю эту проблему.

WebFluxInboundEndpoint работает с этим алгоритмом:

if (isReadable(request)) {
   ...
else {
    return (Mono<T>) Mono.just(exchange.getRequest().getQueryParams());
}

Где метод GET действительно идет к ветви elsepayload сообщения для отправки - MultiValueMap.А также мы недавно исправили с вами проблему для POST, которая также выпущена в версии 5.0.5: https://jira.spring.io/browse/INT-4462

У Диспетчера нет подписчиков

Не может случиться на QueueChannel в принципе.Там вообще нет ни одного диспетчера.Это просто очередь, и отправитель предлагает сообщение для хранения.Вам не хватает чего-то еще, чтобы поделиться с нами.Но давайте назовем вещи своими именами: messagesPerCustomerQueue не является QueueChannel в вашем приложении.

ОБНОВЛЕНИЕ

Относительно:

Я хочу, чтобы сообщение оставалось в очереди и неоднократно предлагалось всем подписчикам до тех пор, пока оно не будет или потреблено, или срок его действия истечет (правильный выборочный потребитель)

Только то, что мы видим, является PollableJmsChannel на основе встроенного ActiveMQ для соблюдения TTL для сообщений.Как потребитель этой очереди вы должны иметь PublishSubscribeChannel с setMinSubscribers(1), чтобы сделать MessagingTemplate, чтобы бросить MessageDeliveryException, когда еще нет подписчиков.Таким образом, транзакция JMS будет откатана, и сообщение вернется в очередь для следующего цикла опроса.

Проблема с оперативной памятью QueueChannel в том, что нет транзакционной повторной доставки, и сообщение после опроса из этой очереди будет потеряно.

Другая опция, аналогичная JMS (транзакционная), заключается вJdbcChannelMessageStore для QueueChannel.Хотя, таким образом, у нас нет функциональности TTL ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...