Symfony5 messenger, параллельные очереди для одинаковых обработчиков сообщений - PullRequest
0 голосов
/ 19 июня 2020

Symfony мессенджер:

https://symfony.com/doc/current/messenger.html

Проблема:

Pool # 1 = (user1 создает Job , Job делится на 10 мессенджеров Message) Пул # 2 = (user2 создает Job, Job делится на 10 мессенджеров Message) ... Пул # 100 = (user100 создает Job, Job делится на 10 мессенджеров Message)

Пул # 100 не будет выполняться, пока все предыдущие пулы не будут завершены.

Цель:

Мне нужны параллельные очереди, чтобы все пулы запускались отдельно, поэтому каждый пул будет иметь личную очередь.

Пример кода:

config / packages / messenger.yaml
framework:
    messenger:
        transports:
            sync: "%env(MESSENGER_TRANSPORT_DSN)%"
        routing:
            'App\Message\Job': sync
src / Message / Job. php
<?php

namespace App\Message;

class Job
{
    private $content;

    public function __construct(string $content)
    {
        $this->content = $content;
    }

    public function getContent(): string
    {
        return $this->content;
    }
}
src / MessageHandler / JobHandler. php
<?php

namespace App\MessageHandler;

use App\Message\Job;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class JobHandler implements MessageHandlerInterface
{
    public function __construct()
    {}

    public function __invoke(Job $message)
    {
        $params = json_decode($message->getContent(), true);
        dump($params);
    }
}
src / Controller / JobController. php
<?php

namespace App\Controller;

use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;

/**
 * @Route("/job")
 */
class JobController extends AbstractController
{
    /**
     * @Route("/create", name="app_job_create")
     * @param Request $request
     * @param MessageBusInterface $bus
     * @return JsonResponse
     */
    public function create(Request $request, MessageBusInterface $bus): JsonResponse
    {
        // ...
        $entityId = $entity->getId();
        // ...

        for ($i = 0; $i < 10; $i++) {
            $params['entityId'] = $entityId;
            $params['counter'] = $i;
            $bus->dispatch(new Job(json_encode($params)));
        }

        return new JsonResponse([]);
    }
}

Подробнее:

Я хотел бы продолжать использовать это, но не могу найти простейшего решения для передачи какого-либо уникального имени очереди или идентификатора, которые затем говорят работнику, что он должен обрабатывать только этот пул Messages. Я нашел нестандартные транспорты https://symfony.com/doc/current/messenger/custom-transport.html, но не уверен, что это может помочь. По крайней мере, я считаю, что одного таможенного транспорта недостаточно. И я читал о Actor models https://www.brianstorti.com/the-actor-model/, но я бы хотел использовать только Messenger + Redis, если возможно.

Вероятно, здесь нет решения, и этот мессенджер не может обрабатывать параллельные очереди все же. В любом случае я рад любой помощи. Спасибо!

1 Ответ

0 голосов
/ 10 июля 2020

В итоге я решил эту проблему, используя имена очереди динамических c. К сожалению, мне пришлось отказаться от мессенджера.

Также SDK для RabbitMQ https://github.com/php-amqplib/RabbitMqBundle в настоящее время не поддерживается для symfony 5 (июнь 2020 г.), я не уверен на 100%, а вот на 3й версии использовал, а на 5ой поставить не смог. Поэтому я использовал другой.

Вот хорошее краткое руководство https://blog.programster.org/rabbitmq-job-queue-with-php В примере я изменил RABBITMQ_QUEUE_NAME на имя Dynami c, и он работал нормально.

Затем все как обычно, поднимите RabbitMQ и настройте супервизор https://symfony.com/doc/current/messenger.html#supervisor -конфигурация

Буду рад, если это кому-то поможет, спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...