Задержка сообщения в RabbitMQ - PullRequest
       7

Задержка сообщения в RabbitMQ

30 голосов
/ 15 декабря 2010

Можно ли отправить сообщение через RabbitMQ с некоторой задержкой? Например, я хочу завершить сеанс клиента через 30 минут и отправить сообщение, которое будет обработано через 30 минут.

Ответы [ 7 ]

12 голосов
/ 22 мая 2012

С выпуском RabbitMQ v2.8 запланированная доставка теперь доступна, но в качестве косвенной функции: http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

12 голосов
/ 07 апреля 2017

Существует два подхода, которые вы можете попробовать:

Старый подход: Установить заголовок TTL (время жизни) в каждом сообщении / очереди (политика), а затем ввести DLQ для обработкиЭто.по истечении времени ttl ваши сообщения будут перемещаться из DLQ в основную очередь, чтобы ваш слушатель мог их обработать.

Последний подход: Недавно RabbitMQ разработал RabbitMQ модуль отложенных сообщений , с помощью которого вы можете добиться того же самого, и эта поддержка плагина доступна начиная с RabbitMQ-3.5.8.

Вы можете объявить обмен с типом x-delayed-message, а затем опубликовать сообщения с пользовательским заголовком x-задержка, выраженная в миллисекундах время задержки для сообщения.Сообщение будет доставлено в соответствующие очереди после x-delay миллисекунд

byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new 
AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

Подробнее здесь: git

8 голосов
/ 13 апреля 2016

Благодаря ответу Нормана я смог реализовать его в NodeJS.

Все довольно ясно из кодаНадеюсь, это сэкономит кому-то время.

var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});

// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
      deadLetterExchange: "my_final_delayed_exchange",
      messageTtl: 5000, // 5sec
}, function (err, q) {
      ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
      ch.bindQueue(q.queue, "my_final_delayed_exchange", '');

      ch.consume(q.queue, function (msg) {
          console.log("delayed - [x] %s", msg.content.toString());
      }, {noAck: true});
});
6 голосов
/ 24 января 2014

Похоже, в этом блоге описывается использование обмена мертвыми буквами и сообщения ttl для выполнения чего-то подобного.

В приведенном ниже коде используются CoffeeScript и Node.JS для доступа к Rabbit и реализации чего-либопохоже.

amqp   = require 'amqp'
events = require 'events'
em     = new events.EventEmitter()
conn   = amqp.createConnection()

key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->
  conn.queue key, {
    arguments:{
      "x-dead-letter-exchange":"immediate"
    , "x-message-ttl": 5000
    , "x-expires": 6000
    }
  }, ->
    conn.publish key, {v:1}, {contentType:'application/json'}

  conn.exchange 'immediate'

  conn.queue 'right.now.queue', {
      autoDelete: false
    , durable: true
  }, (q) ->
    q.bind('immediate', 'right.now.queue')
    q.subscribe (msg, headers, deliveryInfo) ->
      console.log msg
      console.log headers
6 голосов
/ 13 февраля 2013

Поскольку у меня недостаточно репутации, чтобы добавить комментарий, выкладываю новый ответ.Это просто дополнение к тому, что уже обсуждалось на http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

За исключением того, что вместо установки ttl для сообщений вы можете установить его на уровне очереди.Также вы можете избежать создания нового обмена только для перенаправления сообщений в другую очередь.Вот пример кода Java:

Производитель:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;

public class DelayedProducer {
    private final static String QUEUE_NAME = "ParkingQueue";
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-message-ttl", 10000);
        arguments.put("x-dead-letter-exchange", "");
        arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME );
        channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);

        for (int i=0; i<5; i++) {
            String message = "This is a sample message " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("message "+i+" got published to the queue!");
            Thread.sleep(3000);
        }

        channel.close();
        connection.close();
    }
}

Потребитель:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {
   private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        boolean autoAck = false;
        channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }

    }
}
5 голосов
/ 23 февраля 2011

В настоящее время это невозможно. Вы должны хранить свои метки времени истечения в базе данных или что-то подобное, а затем иметь вспомогательную программу, которая считывает эти метки времени и помещает в очередь сообщение.

Задержанные сообщения являются часто запрашиваемой функцией, поскольку они полезны во многих ситуациях. Однако, если вам нужно закончить сеансы клиентов, я считаю, что обмен сообщениями не является идеальным решением для вас, и что другой подход может работать лучше.

0 голосов
/ 13 февраля 2014

Предположим, у вас есть контроль над потребителем, вы можете добиться задержки для потребителя следующим образом ??:

Если мы уверены, что n-е сообщение в очереди всегда имеет меньшую задержку, чем n + 1-е сообщение (это может быть верно для многих случаев использования): производитель отправляет timeInformation в задаче, передавая время, в которое это задание необходимо быть выполненным (currentTime + delay). Потребитель:

1) Считывает запланированное время из задачи

2) если текущее время> запланированное время.

Иначе задержка = запланированное время - текущее время

сон в течение времени, обозначенного задержкой

Потребитель всегда настроен с параметром параллелизма. Таким образом, другие сообщения будут просто ждать в очереди, пока потребитель не завершит работу. Таким образом, это решение могло бы работать хорошо, хотя и выглядит неловко, особенно из-за больших задержек.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...