RabbitMq Неподтвержденные сообщения после Consumer basic_cancel - PullRequest
0 голосов
/ 29 августа 2018

Хорошо, не вдаваясь в подробности всей системы, которую я настроил,

Проблема, с которой я столкнулся, заключается в том, что когда потребители отменяют (AMQPChannel-> basic_cancel) прослушивание очереди, он оставляет одно дополнительное сообщение неподтвержденным этим работником. Он также не запускает нормальный обратный вызов для обработки этого сообщения.

Несколько подробностей

  • Очереди блокируются (используется ожидание) while(count($channel->callbacks)) { $channel->wait( ... ) ... }
  • Предварительная выборка равна 1, наименьшее, что вы можете иметь
  • Потребители могут динамически слушать (AMQPChannel->basic_consume)
  • Потребители могут динамически забывать (AMQPChannel->basic_cancel)

Я не пойду точно так, как говорю конкретному потребителю потреблять или отменять очередь выдачи. Но все работает отлично, начало потребления или отмены просто отлично. Но когда я отменяю очередь, в которой все еще есть сообщения, они просто забывают об этом последнем сообщении, и я полагаю, что отмена удаляет обратный вызов для этой очереди, и поэтому нет способа вернуть это сообщение, за исключением убийства потребителя, что нежелательно.

Я сделал некоторую отладку (просто пример, а не то, что я на самом деле делаю)

   Debug::dump($this->getAmqpChannel()->getMethodQueue());
   $tag = $this->_tags[$queue]; //I keep track of the consumer tag on a queue by queue basis, $queue == {queuename} below
   $this->getAmqpChannel()->basic_cancel( $tag );
   Debug::dump($this->getAmqpChannel()->getMethodQueue());

Вывод это примерно

  array()
  RunCommand: basic_cancel //this works fine consumer forgets queue except ->
  array(1){
    [0] => array(3){
        [0] => string(5) "60,60",
        [1] => string(114) "amq.ctag-D9om-gD_cPevzeon52zpig\0\0\0\0\0\0\0\0\0G{queuename}",  //{queuename} is the name of the queue, which is based on clients information I cant share (such as their name)
       [2] => object(PhpAmqpLib\Message\AMQPMessage)#0 (9) {
            ["DELIVERY_MODE_NON_PERSISTENT":constant] => int(1),
            ["DELIVERY_MODE_PERSISTENT":constant] => int(2),
            ["body":public] => string(1358647) "{ ... "correlation_id":32,"max_correlation_id":38}"
            ["body_size":public] => int(135864),
            ["is_truncated":public] => bool(false),
            ["content_encoding":public] => null,
            ["propertyDefinitions":protected static] => array(14){ ... }
            ["delivery_info":public] => array(0){},
            ["prop_types":protected] => array(14){ ... }
      }
    }

Как только рабочий умирает (или, скорее, я его убиваю), сообщение помещается обратно в очередь, и я могу вытащить его в компоненте управления RabbitMq (плагин) в разделе сообщений get. И вот оно,

  Properties
    correlation_id: 32:38
    delivery_mode:  2
    content_encoding:   text/plain
    content_type:   application/json

"correlation_id":32,"max_correlation_id":38 соответствует correlation_id: 32:38, потому что мне нужно отслеживать части сообщения. Итак, я знаю, что это то же самое сообщение.

Так почему же после отмены я получаю последнее сообщение, застрявшее в стране зомби, и в любом случае оно может выкинуть его обратно в очередь, за исключением убийства потребителя.

Кроме того, это не разовое, это происходит каждый раз, когда я отменяю очередь, в которой все еще есть сообщения. Так что это не имеет ничего общего с данным сообщением. Похоже, что он получает одно последнее предпочтительное сообщение, а затем, поскольку оно отменено, обратного вызова для последнего не выполняется, и он просто застрял в подвешенном состоянии. Помните, что 0 предварительная выборка - выборка всех сообщений, 1 - самое низкое значение, которое вы можете установить.

Любая помощь может быть великолепной.

UPDATE

У меня может быть решение, позвонив по номеру

 $this->getAmqpChannel()->basic_recover(true); //basic_recover($requeue)

Либо непосредственно перед, либо после basic_cancel

Это отклоняет сообщение, и я даже могу проверить $this->getAmqpChannel()->getMethodQueue(), как показано выше, чтобы увидеть, если $queue, который я отменяю, содержит сообщение, которое было собрано (еще не реализовано).

Я пытался избежать использования recover, но я думаю, что все должно быть в порядке, потому что потребители используют один канал и блокируют, и в худшем случае это просто отклонит правильное сообщение 1 раз, что, хотя и не идеально, должно быть приемлемым .

Однако в некоторых случаях я получаю дополнительное исключение от Кролика,

  PRECONDITION_FAILED - unknown delivery tag {n}

Если у кого-нибудь есть какие-либо подробности об этой дополнительной ошибке, это было бы здорово. Также все очереди требуют подтверждения, ни одна из них не является автоматической.

Update1

Я заметил в трассировке стека queue_unbind, поэтому то, что я сделал, это внутреннее отслеживание привязок, таким образом, я могу гарантировать, что открепление выполняется только один раз. Я опубликую некоторый код после того, как завтра проведу еще какое-то тестирование, но мое первоначальное тестирование после реализации, которое больше не выдает ошибку.

Все это может показаться странным, и я могу объяснить, почему и что я делаю со всем этим, но это, вероятно, выходит за рамки вопроса. Я скажу, что я использовал эту систему в производстве более 2 лет (я разработал ее), и мы можем выполнять 180 тыс. Запросов в минуту (около 100, если учитывать все части системы). Мы также сделали более 280 миллиардов поисковых запросов, и я это сделал. Мы также являемся ведущей компанией в нашей отрасли, которая либо устранила наших конкурентов, либо они прислали нам свои вещи и больше не занимаются этим самостоятельно. Это во многом из-за нашего быстрого поворота, а также качества наших данных. Так что эта система работает и работает очень хорошо.

Но в недавних аудитах я заметил, что потребителям Daily приходится иметь дело только с примерно 10 миллионами строк (примерно 100 минут работы), в то время как ночные потребители имеют дело примерно с 100 миллионами строк (или около 20 часов работы). Ежедневные потребители могут выполнять ночные задания, но только в нерабочее время (поскольку это сокращает время отклика в течение дня), поэтому существует примерно 10-часовое окно, в котором ночные задания выполняются только на гораздо меньшем и менее работоспособном сервере. Это решение дает нам то, что если нет ежедневных заданий (заданий, отправленных клиентами), они могут динамически переключаться на ночные данные (хранилища данных) на лету. Это должно сохранить большую часть оперативности, при этом не тратя ресурсы, когда никакие задания не отправляются. Мы можем по горизонтали масштабировать столько, сколько мы хотим в поисках, но мы платим много за наш главный сервер и тратим около 8 часов работы, которую мы могли бы сделать.

Возможно, я мог бы заполнить небольшую книгу тем, как все это работает, но, надеюсь, это даст некоторое представление о том, что я делаю. Я также связан некоторыми пунктами о неразглашении и неконкурентоспособности моего контракта, поэтому я действительно могу вникнуть в конкретные детали.

1 Ответ

0 голосов
/ 29 августа 2018

Consumer во фразеологии RabbitMQ означает подписчика в очереди. (См. этот ответ для получения подробной информации о различиях между каналом, потребителем и соединением).

Когда вы включаете подтверждения, они включаются для channel. Любые сообщения, доставляемые по этому каналу, будут иметь тег доставки , связанный с ними. Когда вы закончите обработку сообщения, вам нужно сообщить серверу по тому же каналу, что сообщение было обработано. Отмена потребителя не влияет на подтверждение уже доставленных сообщений. Фактически, это был бы совершенно правильный вариант использования для получения сообщения, отмены потребителя, обработки сообщения и отправки подтверждения.

Таким образом, у вас есть два варианта. Вы можете оставить сообщение без подтверждения, и в этом случае все, что вам нужно сделать, это закрыть канал, и оно будет помещено в очередь в начале очереди. Или вы можете подтвердить его (nack или ack), и в этом случае сообщение будет помещено в очередь, если nack, или сброшено, если ack.

Если я правильно помню, НЕ указание счетчика предварительной выборки (через basic.qos) приведет к предварительной выборке, равной нулю, что означает, что вы должны подтвердить предыдущее сообщение перед получением следующего сообщения. Я могу ошибаться в этом. Конечно, если вы используете basic.get, вы избежите этой проблемы в целом с очень небольшим влиянием на производительность.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...