RabbitMQ + Reactor, отправляющий ACK после передачи сообщения? - PullRequest
0 голосов
/ 17 января 2019

Я пытаюсь выяснить, является ли Project-Reactor приемлемым вариантом для моего варианта использования.

Мне нужна высокая согласованность в моем приложении, я хочу:

  1. Получите сообщение от RabbitMQ
  2. сделать некоторые манипуляции / преобразования
  3. Поместить сообщение в (другую) очередь RabbitMQ
  4. Отправить ACK для исходного сообщения

С обычной библиотекой Java RabbitMQ это выглядело бы примерно так:

var connectionFactory = new ConnectionFactory();
var connection = connectionFactory.newConnection();
var channel = connection.createChannel();
channel.txSelect();
channel.queueDeclare("queue.A", true, false, false, null);
channel.queueDeclare("queue.B", true, false, false, null);

channel.basicConsume("queue.A", false, (tag, delivery) ->
{
    String data = new String(delivery.getBody(), StandardCharsets.UTF_8);
    System.out.println("Received: " + data);
    channel.basicPublish("", "queue.B", null, data.getBytes(StandardCharsets.UTF_8));
    channel.txCommit();
    System.out.println("Sending ACK");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    channel.txCommit();

}, tag -> System.err.println("Tag " + tag + " failed"));

Это хорошо печатает

Received: DATA
Sending ACK

Используя Reactor-RabbitMQ, я придумал следующее:

var options = new ReceiverOptions();
var receiver = RabbitFlux.createReceiver(options);
var sender = RabbitFlux.createSender();
sender.declare(QueueSpecification.queue("queue.A")).block();
sender.declare(QueueSpecification.queue("queue.B")).block();
sender
    .send(receiver.consumeManualAck("queue.A")
        .doOnError(ex -> ex.printStackTrace())
        .doOnEach(s ->
        {
            s.get().ack();
            print("ACK");
        })
        .map(ad ->
        {
            print("MAP");
            return new OutboundMessage("", "queue.B", ad.getBody());
        }))
    .doOnEach(v ->
    {
        print("SUB");
    })
    .subscribe();

Однако, это печатает

ACK
MAP

Очевидно, что это неправильный порядок, потому что я делаю doOnEach до map. Но AcknowledgableDelivery больше не доступен после того, как я сопоставил его с OutboundMessage для отправителя.

обратите внимание, что SUB никогда не печатается. Возможно, потому что подписка send является непрерывной и никогда не достигает состояния завершения Mono<Void>.

Мои знания о Reactor не столь обширны, поэтому я чувствую, что упускаю что-то, что я мог бы использовать здесь. Можно ли сделать это аккуратно с Reactor или я должен забыть об этом?


Сноска: я использую var, потому что я нахожусь в Java 11, и операторы print как способ количественного определения порядка, в котором выполняются вещи.

1 Ответ

0 голосов
/ 18 января 2019

Я не думаю, что это можно подтвердить внутри метода отправки. Вместо этого вы можете попробовать:

receiver.consumeManualAck("queue.A")
        .flatMap(ad -> {
           Mono<OutboundMessage> outboundMessageMono =
                Mono.just(new OutboundMessage("", "queue.B", ad.getBody()));
           return sender.send(outboundMessageMono)
                        .doFinally(signalType -> {
                            if (signalType == SignalType.ON_COMPLETE) {
                               ad.ack();
                            } else {
                               ad.nack(false);
                           }
                         });
         });
...