RabbitMQ хранит сообщения в обмен - PullRequest
2 голосов
/ 22 марта 2019

Я пытаюсь выяснить, возможно ли хранить сообщения в обмене RabbitMQ, даже когда не работает ни один потребитель.

Я понял (возможно, неправильно), что для достижения этого обмен должен быть "долговечным"«а также очередь и сообщение должны быть отправлены с« постоянным »флагом

'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT

Моя главная цель - сохранить все сообщения в обмене так, чтобы в случаепричина, по которой ни один потребитель не запускается, когда я запускаю его, все сообщения в обмене могут быть направлены в связанную очередь.Я объявляю свои обмены и очереди следующим образом:

//Sender.php
public function sendToQueue(ActionMessage $message)
    {
        $headers = [
            'content-type' => 'application/json',
            'timestamp' => $message->getCreatedAt()->getTimestamp(),
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ];
        $channel = $this->connection->getChannel();
        $channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
        $qMessage = new AMQPMessage(json_encode($message->toArray()), $headers);
        $channel->basic_publish($qMessage, $this->exchangeName, $message->getTopic());
        return true;
    }
//Receiver.php
public function consume($callbackFunction)
        {
            $channel = $this->messenger->getChannel();
            $channel->exchange_declare($this->exchange, 'direct', false, true, false);
            list($queueName, ,) = $channel->queue_declare('', false, true, true, false);
            $channel->queue_bind($queueName, $this->exchangeName, $this->topicAction);

            $channel->basic_consume($queueName, '', false, true, false, false, $callbackFunction);

            while (count($channel->callbacks)) {
                $channel->wait();
            }

            $channel->close();
            $this->messenger->close();
        }

Буду признателен за любую помощь (даже просто чтобы отказаться от идеи и вставить некоторое пространство между ними).Спасибо.

1 Ответ

0 голосов
/ 26 марта 2019

Обмен не хранит сообщения, то есть работу очереди.Проблема не в том, что потребители не работают, а в том, что очередей не существует, потому что вы оставили потребителям объявлять свои собственные очереди.

Если вы хотите, чтобы сообщения оставались постоянными до тех пор, пока потребитель не получит их, вы должны объявить:

  • Обмен, который «Отправитель» опубликует в
  • Именованная очередь, присоединенная к этому.обмен для каждого типа сообщений, которые будут использоваться отдельно (по одному на каждый ключ маршрутизации, если используется direct exchange)

Оба они могут быть объявлены в сценарии отправителя, но в большинстве случаев этоимеет смысл объявить их один раз при развертывании приложения, обрабатывая их так же, как схему базы данных.

Вместо создания анонимной очереди в сценарии Receiver, вы можете просто присоединиться к указанной очереди, иначать получать сообщения, ожидающие там.

Основное различие, которое это будет иметь, состоит в том, как несколько потребителей для одного и того же ключа маршрутизации будут взаимодействоватьact:

  • Несколько очередей, подключенных к одному обмену, как в существующем коде, создать несколько копий каждого сообщения.Это полезно, если разные потребители делают разные вещи с одними и теми же сообщениями.
  • Несколько потребителей, подключенных к одной очереди, как я уже говорил выше, будут делиться сообщениями скаждый обрабатывается другим потребителем по существу наугад.Это полезно, если у вас есть несколько идентичных потребителей, чтобы иметь дело с большим количеством сообщений.

* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 1033.* Вы можете обнаружить, что вам действительно нужна смесь:

  • Предварительно объявите очередь для каждого потребителя, которая должна видеть каждое сообщение , чтобы обеспечить сохранение копии каждого сообщения до этот конкретный потребитель готов к его чтению.
  • Объявите дополнительные временные очереди в дополнительных потребителях для получения дополнительной копии сообщений по мере их поступления.

В заключение отметим, что в RabbitMQ есть два механизма возврата к различной обработке сообщений, которые не могут быть обработаны:

  • An Альтернативный Exchange перехватывает сообщения, которые будут отброшены из обмена (поскольку нет соответствующей очереди).
  • A Dead Letter Exchange перехватывает сообщенияd будет удален из очереди (например, из-за того, что он был отклонен потребителем или достиг заданного времени ожидания).

AE может быть полезно в вашем примере, если вы нена самом деле вы не хотите нормально обрабатывать пропущенные сообщения, вы просто хотите их обнаружить, например, занести их в журнал ошибок.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...