Получите размер очереди от обратного вызова потребителя rabbitmq с PhpAmqpLib - PullRequest
1 голос
/ 15 апреля 2019

Я хочу регистрировать рабочее состояние по рабочим обратным вызовам и включать количество оставленных сообщений в очередь.

Единственное решение, которое я нашел до сих пор, это получение второго члена массива результатов queue_declare, но его нужно вызывать один раз за запуск рабочего процесса, и мне нужно обновлять информацию при каждом новом сообщении.

UPD : Решение основано на ответе IMSoP :

<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('test1');
echo "[*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) use ($channel) {
    list (, $cn) = $channel->queue_declare('test1', true);
    echo ' [x] Received ', $msg->body, " $cn left";
    for ($i = 0; $i < $msg->body; ++$i) {
        sleep(1);
        echo '.';
    }
    echo "\n";
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('test1', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
    $channel->wait();
}

По некоторым причинам всегда дает 0 в качестве количества сообщений.

1 Ответ

1 голос
/ 15 апреля 2019

В методе queue_declare есть параметр с именем «пассивный», который можно использовать для этой цели: он проверяет, существует ли очередь только по имени, и игнорирует любые другие параметры.

Согласно документации AMQP :

Если установлено, сервер ответит объявлением-ОК, если очередь уже существует с таким именем, и выдаст ошибку, если нет. Клиент может использовать это, чтобы проверить, существует ли очередь без изменения состояния сервера. Если установлено, все остальные поля метода, кроме name и no-wait, игнорируются. Объявление с пассивным и без ожидания не имеет никакого эффекта. Аргументы сравниваются на предмет семантической эквивалентности.

Обратите внимание, что Declare-Ok - это не просто статус, а имя структура полного ответа с полями queue, message-count и consumer-count.

В PHP-AMQPLib вы можете использовать это для регистрации состояния набора очередей примерно так:

foreach ( $this->registeredQueues as $queueName ) {
    // The second parameter to queue_declare is $passive
    // When set to true, everything else is ignored, so need not be passed
    list($queueName, $messageCount, $consumerCount)
        = $this->rabbitChannel->queue_declare($queueName, true);

    $this->logger->info(
        "Queue $queueName has $messageCount messages and $consumerCount active consumers."
    );
}
...