У меня есть пружинный интеграционный поток, который генерирует сообщения, которые следует хранить, ожидая, пока соответствующий потребитель придет и потребит их.
@Bean
public IntegrationFlow messagesPerCustomerFlow() {
return IntegrationFlows.
from(WebFlux.inboundChannelAdapter("/messages/{customer}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.requestPayloadType(JsonNode.class)
.headerExpression("customer", "#pathVariables.customer")
)
.channel(messagesPerCustomerQueue())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(100);
}
@Bean
public QueueChannel messagesPerCustomerQueue() {
return MessageChannels.queue()
.get();
}
Сообщения в очереди должны доставляться как отправленные серверомсобытия через http, как показано ниже.
PublisherSubscription является просто держателем для Publisher и IntegrationFlowRegistration, последнее используется для уничтожения динамически создаваемого потока, когда он больше не нужен (обратите внимание, что входящее сообщение дляGET не имеет содержимого, которое не обрабатывается должным образом ATM посредством интеграции Webflux, поэтому для получения доступа к переменной пути, помещенной в заголовок customer
, требуется небольшой обходной путь:
@Bean
public IntegrationFlow eventMessagesPerCustomer() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/events/{customer}")
.requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
.headerExpression("customer", "#pathVariables.customer")
.payloadExpression("''") // neeeded to make handle((p,h) work
)
.log()
.handle((p, h) -> {
String customer = h.get("customer").toString();
PublisherSubscription<JsonNode> publisherSubscription =
subscribeToMessagesPerCustomer(customer);
return Flux.from(publisherSubscription.getPublisher())
.map(Message::getPayload)
.doFinally(signalType ->
publisherSubscription.unsubscribe());
})
.get();
}
Вышеупомянутый запросдля событий, отправляемых сервером, динамически регистрируется поток, который подписывается на канал очереди по требованию с избирательным потребителем , реализованным фильтром с throwExceptionOnRejection(true)
.Следуя спецификации для Цепочка обработчиков сообщений , которая должна гарантировать, что сообщение предлагается всем потребителям, пока один не примет его.
public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
.filter("headers.customer=='" + customer + "'",
filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();
IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
.register();
return new PublisherSubscription<>(messagePublisher, registration);
}
Эта конструкция работает в принципе, но со следующими проблемами:
- Сообщения, отправленные в очередь, когда подписчиков вообще нет, ведут к
MessageDeliveryException: Dispatcher has no subscribers for channel 'application.messagesPerCustomerQueue'
- Сообщения, отправленные в очередь, пока нет соответствующих подписчиков, все же приводят к
AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed
.
Я хочу, чтобы сообщение оставалось в очереди и неоднократно предлагалось всем подписчикам до тех пор, пока оно не будет использовано или истечет срок годности (правильный выборочный потребитель).Как я могу это сделать?