Vert.x: обрабатывать обратное давление потребителей Rabbitmq, не сбрасывая новые сообщения - PullRequest
1 голос
/ 02 июля 2019

Когда у вас больше входящих сообщений, чем вы можете обработать. Решением является ограничение размера внутренней очереди:

// Limit to max 300 messages
QueueOptions options = new QueueOptions()
  .setMaxInternalQueueSize(300);

RabbitMQClient client = RabbitMQClient.create(vertx, new RabbitMQOptions());

client.basicConsumer("my.queue", options, res -> {
  if (res.succeeded()) {
    System.out.println("RabbitMQ consumer created !");
    RabbitMQConsumer mqConsumer = res.result();
    mqConsumer.handler((RabbitMQMessage message) -> {
      System.out.println("Got message: " + message.body().toString());
    });
  } else {
    res.cause().printStackTrace();
  }
});

Проблема заключается в том, что при превышении емкости очереди внутренней очереди новое сообщение будет просто отброшено .

Как справиться с обратным давлением, не потеряв ни одного сообщения?

1 Ответ

0 голосов
/ 02 июля 2019

Вызвать mqConsumer.pause(), когда вы начнете обрабатывать сообщение, затем mqConsumer.resume(), когда вы закончите его обрабатывать.Вы всегда должны делать это, когда ваша обработка не является синхронной.

https://vertx.io/docs/vertx-rabbitmq-client/java/#_consume

...