Symfony мессенджер:
https://symfony.com/doc/current/messenger.html
Проблема:
Pool # 1 = (user1
создает Job
, Job
делится на 10 мессенджеров Message
) Пул # 2 = (user2
создает Job
, Job
делится на 10 мессенджеров Message
) ... Пул # 100 = (user100
создает Job
, Job
делится на 10 мессенджеров Message
)
Пул # 100 не будет выполняться, пока все предыдущие пулы не будут завершены.
Цель:
Мне нужны параллельные очереди, чтобы все пулы запускались отдельно, поэтому каждый пул будет иметь личную очередь.
Пример кода:
config / packages / messenger.yaml
framework:
messenger:
transports:
sync: "%env(MESSENGER_TRANSPORT_DSN)%"
routing:
'App\Message\Job': sync
src / Message / Job. php
<?php
namespace App\Message;
class Job
{
private $content;
public function __construct(string $content)
{
$this->content = $content;
}
public function getContent(): string
{
return $this->content;
}
}
src / MessageHandler / JobHandler. php
<?php
namespace App\MessageHandler;
use App\Message\Job;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class JobHandler implements MessageHandlerInterface
{
public function __construct()
{}
public function __invoke(Job $message)
{
$params = json_decode($message->getContent(), true);
dump($params);
}
}
src / Controller / JobController. php
<?php
namespace App\Controller;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;
/**
* @Route("/job")
*/
class JobController extends AbstractController
{
/**
* @Route("/create", name="app_job_create")
* @param Request $request
* @param MessageBusInterface $bus
* @return JsonResponse
*/
public function create(Request $request, MessageBusInterface $bus): JsonResponse
{
// ...
$entityId = $entity->getId();
// ...
for ($i = 0; $i < 10; $i++) {
$params['entityId'] = $entityId;
$params['counter'] = $i;
$bus->dispatch(new Job(json_encode($params)));
}
return new JsonResponse([]);
}
}
Подробнее:
Я хотел бы продолжать использовать это, но не могу найти простейшего решения для передачи какого-либо уникального имени очереди или идентификатора, которые затем говорят работнику, что он должен обрабатывать только этот пул Messages
. Я нашел нестандартные транспорты https://symfony.com/doc/current/messenger/custom-transport.html, но не уверен, что это может помочь. По крайней мере, я считаю, что одного таможенного транспорта недостаточно. И я читал о Actor models
https://www.brianstorti.com/the-actor-model/, но я бы хотел использовать только Messenger + Redis, если возможно.
Вероятно, здесь нет решения, и этот мессенджер не может обрабатывать параллельные очереди все же. В любом случае я рад любой помощи. Спасибо!