Я только начинаю использовать php-amqplib и RabbitMQ и хочу получить способ обработки сообщений, которые по какой-либо причине не могут быть обработаны и являются ненадежными.Я думал, что один из способов справиться с этим - это очередь мертвых писем.Я пытаюсь это настроить, но пока мне не повезло, и надеюсь, что кто-то может предложить некоторые предложения.
Моя инициализация очередей выглядит примерно так:
class BaseAbstract
{
/** @var AMQPStreamConnection */
protected $connection;
/** @var AMQPChannel */
protected $channel;
/** @var array */
protected $deadLetter = [
'exchange' => 'dead_letter',
'type' => 'direct',
'queue' => 'delay_queue',
'ttl' => 10000 // in milliseconds
];
protected function initConnection(array $config)
{
try {
$this->connection = AMQPStreamConnection::create_connection($config);
$this->channel = $this->connection->channel();
// Setup dead letter exchange and queue
$this->channel->exchange_declare($this->deadLetter['exchange'], $this->deadLetter['type'], false, true, false);
$this->channel->queue_declare($this->deadLetter['queue'], false, true, false, false, false, new AMQPTable([
'x-dead-letter-exchange' => $this->deadLetter['exchange'],
'x-dead-letter-routing-key' => $this->deadLetter['queue'],
'x-message-ttl' => $this->deadLetter['ttl']
]));
$this->channel->queue_bind($this->deadLetter['queue'], $this->deadLetter['exchange']);
// Set up regular exchange and queue
$this->channel->exchange_declare($this->getExchangeName(), $this->getExchangeType(), true, true, false);
$this->channel->queue_declare($this->getQueueName(), true, true, false, false, new AMQPTable([
'x-dead-letter-exchange' => $this->deadLetter['exchange'],
'x-dead-letter-routing-key' => $this->deadLetter['queue']
]));
if (method_exists($this, 'getRouteKey')) {
$this->channel->queue_bind($this->getQueueName(), $this->getExchangeName(), $this->getRouteKey());
} else {
$this->channel->queue_bind($this->getQueueName(), $this->getExchangeName());
}
} catch (\Exception $e) {
throw new \RuntimeException('Cannot connect to the RabbitMQ service: ' . $e->getMessage());
}
return $this;
}
// ...
}
я подумал, что следует настроить обмен и очередь недоставленных писем, а затем настроить обычный обмен и очередь (с помощью методов getRouteKey, getQueueName и getExchangeName / Type, предоставляемых расширяющимися классами)
Когда я пытаюсьобработать сообщение как:
public function process(AMQPMessage $message)
{
$msg = json_decode($message->body);
if (empty($msg->payload) || empty($msg->payload->run)) {
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true);
return;
}
// removed for post brevity, but compose $cmd variable
exec($cmd, $output, $returned);
if ($returned !== 0) {
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} else {
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
}
}
Но я получаю сообщение об ошибке Something went wrong: Cannot connect to the RabbitMQ service: PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'delay_queue' in vhost '/': received 'dead_letter' but current is ''
Это способ, которым я должен установить мертвые буквы?Различные примеры, которые я видел во всем, похоже, показывают немного другой способ справиться с этим, ни один из которых, кажется, не работает для меня.Так что я здесь явно что-то не так понял и благодарен за любые советы.:)