Symfony очереди сообщений с ключом привязки - стратегия повтора - PullRequest
2 голосов
/ 18 апреля 2020

Я внедряю мессенджер в компании, в которой я работаю. Я обнаружил проблему с ключом маршрутизации.

Я хочу отправить одно сообщение в две очереди. Два других приложения будут обрабатывать эти очереди. Все работает хорошо, но я обнаружил проблему, когда обработчик выдает исключение. Он удваивает сообщение, отправляя одну и две очереди на повторные попытки, поскольку очереди повторных попыток совпадают по ключу привязки, который одинаков для этих очередей.

Наконец, с 3 повторными попытками у меня 16 сообщений в моих dlqs. Не могли бы вы помочь мне с этой проблемой? Возможно ли создать стратегию повторения, основанную, может быть, на очереди, а не на ключе маршрутизации?

Моя конфигурация выглядит так:

messenger:
    failure_transport: failed
    default_bus: command.bus
    transports:
        async:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 2
                    max_delay: 0
                exchange:
                    name: olimp
                    type: topic
                queues:
                    create_miniature_v1:
                        binding_keys:
                            - first
                    create_miniature_v2:
                        binding_keys:
                            - first
        failed:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                exchange:
                    name: olimp_dead
                    type: topic
                queues:
                    create_miniature_v1_dlq:
                        binding_keys:
                            - first
                    create_miniature_v2_dlq:
                        binding_keys:
                            - first

    routing:
        'Olimp\Messenger\TestEvent': async

    buses:
        command.bus:
            middleware:
                - Olimp\Shared\Application\Message\Middleware\EventDispatcher
                - doctrine_close_connection
                - doctrine_transaction

        event.bus:
            default_middleware: allow_no_handlers

        query.bus: ~

Я отправляю событие с такой меткой:

class MessengerTestCommand extends Command
{
    protected static $defaultName = 'app:messenger-test';
    private MessageBusInterface $bus;

    public function __construct(MessageBusInterface $bus)
    {
        $this->bus = $bus;

        parent::__construct();
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $io = new SymfonyStyle($input, $output);

        $this->bus->dispatch(
            new TestEvent(), [
                new AmqpStamp('first')
            ]
        );

        $io->success('Done');

        return 0;
    }
}

Обработчик:

class TestEventHandler implements MessageHandlerInterface
{
    public function __invoke(TestEvent $event)
    {
        dump($event->id);

        throw new \Exception('Boom');
    }
}

То, что я нашел на кролике: Rabbit

Теперь я пытался настроить так:

framework:
    messenger:
        failure_transport: failed
        default_bus: command.bus
        transports:
            async:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    retry_strategy:
                        max_retries: 3
                        delay: 1000
                        multiplier: 2
                        max_delay: 0
                    exchange:
                        name: olimp
                        type: topic
                    queues:
                        create_miniature_v1:
                            binding_keys:
                                - first
            async1:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    retry_strategy:
                        max_retries: 3
                        delay: 1000
                        multiplier: 2
                        max_delay: 0
                    exchange:
                        name: olimp
                        type: topic
                    queues:
                        create_miniature_v2:
                            binding_keys:
                                - first
            failed:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    exchange:
                        name: olimp_dead
                        type: topic
                    queues:
                        create_miniature_v1_dlq:
                            binding_keys:
                                - first
                        create_miniature_v2_dlq:
                            binding_keys:
                                - first

        routing:
            'Olimp\Messenger\TestEvent': [async, async1]

и с двумя запущенными консольными командами:

bin/console messenger:consume async
bin/console messenger:consume async1

Но все работает одинаково.

1 Ответ

1 голос
/ 23 апреля 2020

Хорошо, я нашел ответ сам.

Я создал новую стратегию повторения. Я изменил queue_name_pattern на %routing_key%_%delay% и создал свой SendFailedMessageForRetryListener. Чтобы повторить конверт, я добавил штамп new AmqpStamp($envelope->last(AmqpReceivedStamp::class)->getQueueName()), который используется для создания правильного ключа маршрутизации для очереди задержки. Поэтому вместо создания очереди на основе имени для обмена я создал ее на основе имени очереди.

Еще две вещи:

Связывание ключей в очереди выглядит следующим образом:

queues:
    create_miniature_v1:
        binding_keys:
            - create_miniature_v1
            - first
    create_miniature_v2:
        binding_keys:
            - create_miniature_v2
            - first

и неудачные очереди:

queues:
    create_miniature_v1_dlq:
        binding_keys:
            - create_miniature_v1
    create_miniature_v2_dlq:
        binding_keys:
            - create_miniature_v2
...