Вы можете использовать пружину MessageListener
, чтобы получить все сообщения и связать их с контроллером с помощью Mono.create(...)
и вашим собственным прослушивателем событий, который вызывает результат Mono
// Consumes message and trigger result Mono
public interface MyEventListener extends Consumer<MyOutputMessage> {}
Класс для маршрутизации входящих сообщений для исправления MyEventListener
public class MyMessageProcessor {
// You could use in-memory cache here if you need ttl etc.
private static final ConcurrentHashMap<String, MyEventListener> REGISTRY
= new ConcurrentHashMap<>();
public void register(String correlationId, MyEventListener listener) {
MyEventListener oldListeer = REGISTRY.putIfAbsent(correlationId, listener);
if (oldListeer != null)
throw new IllegalStateException("Correlation ID collision!");
}
public void unregister(String correlationId) {
REGISTRY.remove(correlationId);
}
public void accept(String correlationId, MyOutputMessage myOutputMessage) {
Optional.ofNullable(REGISTRY.get(correlationId))
.ifPresent(listener -> listener.accept(myOutputMessage));
}
}
Контроллер Webflux
private final MyMessageProcessor messageProcessor;
....
@PostMapping("/process")
Mono<MyOutputMessage> process(Mono<MyInputMessage> inputMessage) {
String correlationId = ...; //generate correlationId
// then send message asynchronously
return Mono.<MyOutputMessage>create(sink ->
// create and save MyEventListener which call MonoSink.success
messageProcessor.register(correlationId, sink::success))
// define timeout if you don't want to wait forever
.timeout(...)
// cleanup MyEventListener after success, error or cancel
.doFinally(ignored -> messageProcessor.unregister(correlationId));
}
И в onMessage
вашей реализации JMS MessageListener
вы можете вызвать
messageProcessor.accept(correlationId, myOutputMessage);
Вы можете найти аналогичный пример для Fluxв справочном руководстве по реактору 3 1018 *