Асинхронное получение значения от IBM mq с динамическим корреляционным идентификатором - PullRequest
0 голосов
/ 22 октября 2018

Я отправляю сообщения в ibm mq с некоторым значением correlationId (уникальным для каждого сообщения).Затем я хочу прочитать из очереди вывода это конкретное сообщение с определенным корреляционным идентификатором, и я хочу, чтобы оно было неблокирующим, чтобы использовать его в контроллере java webflux.

Мне интересно, есть ли способ сделать этобез боли?Такие параметры, как jmsTemplate.receiveSelected (...) блокируются, в то время как создание интерфейса реализации компонента EJB MessageListener не предоставляет способ выбора сообщения с помощью динамического селектора (т. Е. CorrelationId уникален для каждого сообщения).

1 Ответ

0 голосов
/ 23 октября 2018

Вы можете использовать пружину MessageListener, чтобы получить все сообщения и связать их с контроллером с помощью Mono.create(...) и вашим собственным прослушивателем событий, который вызывает результат Mono

// Consumes message and trigger result Mono
public interface MyEventListener extends Consumer<MyOutputMessage> {}

Класс для маршрутизации входящих сообщений для исправления MyEventListener

public class MyMessageProcessor {
    // You could use in-memory cache here if you need ttl etc.
    private static final ConcurrentHashMap<String, MyEventListener> REGISTRY
            = new ConcurrentHashMap<>();

    public void register(String correlationId, MyEventListener listener) {
        MyEventListener oldListeer = REGISTRY.putIfAbsent(correlationId, listener);
        if (oldListeer != null)
            throw new IllegalStateException("Correlation ID collision!");
    }

    public void unregister(String correlationId) {
        REGISTRY.remove(correlationId);
    }

    public void accept(String correlationId, MyOutputMessage myOutputMessage) {
        Optional.ofNullable(REGISTRY.get(correlationId))
                .ifPresent(listener -> listener.accept(myOutputMessage));
    }
}

Контроллер Webflux

private final MyMessageProcessor messageProcessor;

.... 

@PostMapping("/process")
Mono<MyOutputMessage> process(Mono<MyInputMessage> inputMessage) {
    String correlationId = ...; //generate correlationId

    // then send message asynchronously

    return Mono.<MyOutputMessage>create(sink ->
            // create and save MyEventListener which call MonoSink.success
            messageProcessor.register(correlationId, sink::success))
            // define timeout if you don't want to wait forever
            .timeout(...)
            // cleanup MyEventListener after success, error or cancel
            .doFinally(ignored -> messageProcessor.unregister(correlationId));
}

И в onMessage вашей реализации JMS MessageListener вы можете вызвать

messageProcessor.accept(correlationId, myOutputMessage);

Вы можете найти аналогичный пример для Fluxв справочном руководстве по реактору 3 1018 *

...