PhpAmqpLib + RabbitMQ: как правильно установить приоритет задания? - PullRequest
0 голосов
/ 28 октября 2019

Я пытаюсь заставить RabbitMQ работать в многопользовательском сценарии с приоритетом сообщений.

Но в любом случае на рабочих я получаю неправильную последовательность, например, FIFO. Я установил 'x-max-priority', но все равно это не помогает. Я обнаружил, что вопрос с тем же регистром, но мне не помогает ни один ответ.

Объявление канала:

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$queue       = 'queue_name';
$maxPriority = 70;
$connection  = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel     = $connection->channel();

$channel->queue_declare(
    $queue,
    false,
    true,
    false,
    false,
    false,
    (new AMQPTable())->set('x-max-priority', $maxPriority)
);

Работник:

use PhpAmqpLib \Подключение \ AMQPStreamConnection;используйте PhpAmqpLib \ Message \ AMQPMessage;

$queue       = 'queue_name';
$maxPriority = 70;
$connection  = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel     = $connection->channel();

echo ' [*] Waiting for messages. To exit press CTRL+C'. PHP_EOL;
/** @var AMQPMessage $message */
$callback = static function($message) {
    echo ' [x] Received ' . $message->body . PHP_EOL;
    //echo ' Priority: ' . $message->get('priority') . PHP_EOL;

    sleep(substr_count($message->body, '.'));
    echo ' [x] Done' . PHP_EOL;
    $message->get('channel')->basic_ack($message->getDeliveryTag());
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queue, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

Генерация задачи:

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$queue       = 'queue_name';
$maxPriority = 70;
$connection  = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel     = $connection->channel();

$i = 0;
while ($i <= 500) {
    $priority = random_int(1, $maxPriority);
    $data     = "Message #{$i} Priority: {$priority}" . str_repeat('.', random_int(0, 2));
    $message  = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'priority' => $priority]);
    $channel->basic_publish($message, '', $queue);
    $i++;
}

$channel->close();
$connection->close();

Результаты выглядят так:

[x] Received Message #24 Priority: 27
[x] Done
[x] Received Message #25 Priority: 10
[x] Done
[x] Received Message #26 Priority: 35
[x] Done
[x] Received Message #27 Priority: 49
[x] Done
[x] Received Message #28 Priority: 7
[x] Done
[x] Received Message #29 Priority: 4.

Даже если работник будет перезапущен, ситуация будетто же самое.

Что я скучаю или делаю не так?

...