Я делаю STOMP- PHP пакет для Laravel со встроенной поддержкой Horizon.
До сих пор я добавил обычную (не-Horizon) очередь в pu sh для моего экземпляра ActiveMq, и это работает. Я также хотел использовать Laravel Horizon, но добавление поддержки для него не работает полностью. Задания попадают в экземпляр Horizon, однако они всегда оказываются в состоянии ожидания, а не завершаются.
Какой метод завершает работу? Если я правильно понял, pop()
- это метод, который должен получать экземпляры заданий из очереди и запускать их метод handle()
, но мой метод pop()
, когда дело касается Horizon, никогда не выполняется.
Вот код (do c -блоки удалены для краткости):
class StompQueue extends BaseStompQueue
{
protected $lastPushed;
public function readyNow($queue = null)
{
return $this->size($queue);
}
public function push($job, $data = '', $queue = null)
{
$this->lastPushed = $job;
return parent::push($job, $data, $queue);
}
public function pushRaw($payload, $queue = null, array $options = [])
{
$payload = (new JobPayload($payload))->prepare($this->lastPushed)->value;
return tap(parent::pushRaw($payload, $queue, $options), function () use ($queue, $payload) {
$this->event($this->getQueue($queue), new JobPushed($payload));
});
}
public function later($delay, $job, $data = '', $queue = null)
{
$payload = (new JobPayload($this->createPayload($job, $queue, $data)))->prepare($job)->value;
return tap(parent::pushRaw($payload, $queue, ['delay' => $this->secondsUntil($delay)]), function () use ($payload, $queue) {
$this->event($this->getQueue($queue), new JobPushed($payload));
});
}
public function pop($queue = null)
{
return tap(parent::pop($queue), function ($result) use ($queue) {
if ($result instanceof StompJob) {
$this->event($this->getQueue($queue), new JobReserved($result->getRawBody()));
}
});
}
public function deleteReserved($queue, $job)
{
$this->event($this->getQueue($queue), new JobDeleted($job, $job->getRawBody()));
}
protected function event($queue, $event)
{
if ($this->container && $this->container->bound(Dispatcher::class)) {
$connection = $event->connection($this->getConnectionName());
$this->container->make(Dispatcher::class)->dispatch(
$connection->queue($queue)
);
}
}
}
Рабочая конфигурация Horizon:
'environments' => [
'local' => [
'supervisor-1' => [
'connection' => 'stomp',
'queue' => ['default'],
'balance' => 'simple',
'processes' => 1,
'tries' => 1,
],
],
],
Я попытался перевернуть * Свойство 1018 * находится здесь между redis
и stomp
(это имя моего драйвера), но, как я понял из кода Horizons, redis используется для внутренней работы Horizons и не должен изменяться как соединение супервизора. Как бы то ни было, оба варианта дают одинаковые результаты.