Мертвая надпись с php-amqplib и RabbitMQ? - PullRequest
0 голосов
/ 23 октября 2018

Я только начинаю использовать php-amqplib и RabbitMQ и хочу получить способ обработки сообщений, которые по какой-либо причине не могут быть обработаны и являются ненадежными.Я думал, что один из способов справиться с этим - это очередь мертвых писем.Я пытаюсь это настроить, но пока мне не повезло, и надеюсь, что кто-то может предложить некоторые предложения.

Моя инициализация очередей выглядит примерно так:

class BaseAbstract
{
    /** @var AMQPStreamConnection */
    protected $connection;
    /** @var AMQPChannel */
    protected $channel;
    /** @var array */
    protected $deadLetter = [
        'exchange' => 'dead_letter',
        'type' => 'direct',
        'queue' => 'delay_queue',
        'ttl' => 10000 // in milliseconds
    ];

    protected function initConnection(array $config)
    {
        try {
            $this->connection = AMQPStreamConnection::create_connection($config);
            $this->channel = $this->connection->channel();

            // Setup dead letter exchange and queue
            $this->channel->exchange_declare($this->deadLetter['exchange'], $this->deadLetter['type'], false, true, false);
            $this->channel->queue_declare($this->deadLetter['queue'], false, true, false, false, false, new AMQPTable([
                'x-dead-letter-exchange' => $this->deadLetter['exchange'],
                'x-dead-letter-routing-key' => $this->deadLetter['queue'],
                'x-message-ttl' => $this->deadLetter['ttl']
            ]));
            $this->channel->queue_bind($this->deadLetter['queue'], $this->deadLetter['exchange']);

            // Set up regular exchange and queue
            $this->channel->exchange_declare($this->getExchangeName(), $this->getExchangeType(), true, true, false);
            $this->channel->queue_declare($this->getQueueName(), true, true, false, false, new AMQPTable([
                'x-dead-letter-exchange' => $this->deadLetter['exchange'],
                'x-dead-letter-routing-key' => $this->deadLetter['queue']
            ]));

            if (method_exists($this, 'getRouteKey')) {
                $this->channel->queue_bind($this->getQueueName(), $this->getExchangeName(), $this->getRouteKey());
            } else {
                $this->channel->queue_bind($this->getQueueName(), $this->getExchangeName());
            }
        } catch (\Exception $e) {
            throw new \RuntimeException('Cannot connect to the RabbitMQ service: ' . $e->getMessage());
        }
        return $this;
    }

    // ...
}

я подумал, что следует настроить обмен и очередь недоставленных писем, а затем настроить обычный обмен и очередь (с помощью методов getRouteKey, getQueueName и getExchangeName / Type, предоставляемых расширяющимися классами)

Когда я пытаюсьобработать сообщение как:

public function process(AMQPMessage $message)
{
    $msg = json_decode($message->body);
    if (empty($msg->payload) || empty($msg->payload->run)) {
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true);
        return;
    }

    // removed for post brevity, but compose $cmd variable

    exec($cmd, $output, $returned);
    if ($returned !== 0) {
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    } else {
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
    }
}

Но я получаю сообщение об ошибке Something went wrong: Cannot connect to the RabbitMQ service: PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'delay_queue' in vhost '/': received 'dead_letter' but current is ''

Это способ, которым я должен установить мертвые буквы?Различные примеры, которые я видел во всем, похоже, показывают немного другой способ справиться с этим, ни один из которых, кажется, не работает для меня.Так что я здесь явно что-то не так понял и благодарен за любые советы.:)

1 Ответ

0 голосов
/ 07 ноября 2018

Настройка (постоянных) очередей и обменов - это то, что вы хотите сделать один раз , при развертывании кода, а не каждый раз, когда вы хотите их использовать.Думайте о них как о схеме вашей базы данных - хотя протокол предоставляет «объявлять», а не «создавать», вы, как правило, должны писать код, который предполагает, что все настроено определенным образом .Вы можете встроить первую часть своего кода в сценарий установки или использовать плагин управления на основе веб-интерфейса и интерфейса командной строки для управления ими в простом формате JSON.

Ошибка, с которой вы столкнулисьВидение, вероятно, является результатом попытки объявить одну и ту же очередь в разное время с разными параметрами - «объявление» не заменит или не перенастроит существующую очередь, оно будет рассматривать аргументы как «предварительные условия», которые необходимо проверить.Вам нужно будет отбросить и заново создать очередь или управлять ею через пользовательский интерфейс управления, чтобы изменить существующие параметры.

Когда объявления во время выполнения становятся более полезными, когда вы хотите динамически создавать предметы в вашем брокере.Вы можете либо дать им имена, которые, как вы знаете, будут уникальными для этой цели, либо передать null в качестве имени для получения случайно сгенерированного имени (люди иногда ссылаются на создание «анонимной очереди», но каждая очередь в RabbitMQ имеетимя, даже если вы его не выбрали).


Если я правильно читаю, ваша «схема» выглядит примерно так:

# Dead Letter eXchange and Queue
Exchange: DLX
Queue: DLQ; dead letter exchange: DLX, with key "DLQ"; automatic expiry
Binding: copy messages arriving in DLX to DLQ

# Regular eXchange and Queue
Exchange: RX
Queue: RQ; dead letter exchange: DLX, with key "DLQ"
Binding: copy messages from RX to RQ, optionally filtered by routing key

Когда сообщение"Nacked" в RQ, он будет передан в DLX с перезаписанным ключом маршрутизации, чтобы быть "DLQ".Затем он будет скопирован в DLQ.Если он извлечен из DLQ, или ждет в этой очереди слишком долго , он будет перенаправлен на себя.

Я бы упростил двумя способами:

  • Удалите обмен мертвыми буквами и TTL из «очереди мертвых писем» (которую я назвал DLQ);этот цикл может быть скорее запутанным, чем полезным.
  • Удалите опцию x-dead-letter-routing-key из обычной очереди (которую я назвал RQ).Конфигурация обычной очереди не должна знать, имеет ли Dead Exchange Exchange ноль, одну или несколько очередей, присоединенных к ней, поэтому она не должна знать имя этой другой очереди.Если вы хотите, чтобы прикрепленные сообщения направлялись прямо в одну очередь, просто сделайте это «разветвленным обменом» (который игнорирует ключи маршрутизации) или «обменом темами» с ключом привязки, установленным на # (который является подстановочным знаком, соответствующим всем ключам маршрутизации).

В качестве альтернативы можно задать x-dead-letter-routing-key для имени обычной очереди, т. Е. Указать, из какой очереди она поступила.Но до тех пор, пока у вас не будет сценария использования, я оставлю это простым и оставлю сообщение с его исходным ключом маршрутизации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...