Laravel Горизонт по протоколу STOMP - PullRequest
0 голосов
/ 24 апреля 2020

Я делаю STOMP- PHP пакет для Laravel со встроенной поддержкой Horizon.

До сих пор я добавил обычную (не-Horizon) очередь в pu sh для моего экземпляра ActiveMq, и это работает. Я также хотел использовать Laravel Horizon, но добавление поддержки для него не работает полностью. Задания попадают в экземпляр Horizon, однако они всегда оказываются в состоянии ожидания, а не завершаются.

Какой метод завершает работу? Если я правильно понял, pop() - это метод, который должен получать экземпляры заданий из очереди и запускать их метод handle(), но мой метод pop(), когда дело касается Horizon, никогда не выполняется.

Вот код (do c -блоки удалены для краткости):

class StompQueue extends BaseStompQueue
{
    protected $lastPushed;

    public function readyNow($queue = null)
    {
        return $this->size($queue);
    }

    public function push($job, $data = '', $queue = null)
    {
        $this->lastPushed = $job;
        return parent::push($job, $data, $queue);
    }

    public function pushRaw($payload, $queue = null, array $options = [])
    {
        $payload = (new JobPayload($payload))->prepare($this->lastPushed)->value;

        return tap(parent::pushRaw($payload, $queue, $options), function () use ($queue, $payload) {
            $this->event($this->getQueue($queue), new JobPushed($payload));
        });
    }

    public function later($delay, $job, $data = '', $queue = null)
    {
        $payload = (new JobPayload($this->createPayload($job, $queue, $data)))->prepare($job)->value;

        return tap(parent::pushRaw($payload, $queue, ['delay' => $this->secondsUntil($delay)]), function () use ($payload, $queue) {
            $this->event($this->getQueue($queue), new JobPushed($payload));
        });
    }

    public function pop($queue = null)
    {
        return tap(parent::pop($queue), function ($result) use ($queue) {
            if ($result instanceof StompJob) {
                $this->event($this->getQueue($queue), new JobReserved($result->getRawBody()));
            }
        });
    }

    public function deleteReserved($queue, $job)
    {
        $this->event($this->getQueue($queue), new JobDeleted($job, $job->getRawBody()));
    }

    protected function event($queue, $event)
    {
        if ($this->container && $this->container->bound(Dispatcher::class)) {
            $connection = $event->connection($this->getConnectionName());
            $this->container->make(Dispatcher::class)->dispatch(
                $connection->queue($queue)
            );
        }
    }
}

Рабочая конфигурация Horizon:

'environments' => [
    'local' => [
        'supervisor-1' => [
            'connection' => 'stomp',
            'queue' => ['default'],
            'balance' => 'simple',
            'processes' => 1,
            'tries' => 1,
        ],
    ],
],

Я попытался перевернуть * Свойство 1018 * находится здесь между redis и stomp (это имя моего драйвера), но, как я понял из кода Horizons, redis используется для внутренней работы Horizons и не должен изменяться как соединение супервизора. Как бы то ни было, оба варианта дают одинаковые результаты.

...