Используйте Messenger, чтобы прочитать сообщение в очереди, не отправленное с Messenger - PullRequest
4 голосов
/ 08 апреля 2019

Я пытаюсь прочитать сообщение в очереди (в RabbitMQ), которое не было отправлено с Symfony Messenger.Похоже, что Messenger добавляет несколько заголовков, например

headers: 
    type: App\Message\Transaction

, но при чтении внешних сообщений этот заголовок не существует.

Итак, есть ли способ сообщить Messenger, что каждое сообщение в очередиА должен рассматриваться как тип сообщения Transaction?

Сегодня у меня есть:

framework:
    messenger:
        transports:
            # Uncomment the following line to enable a transport named "amqp"
            amqp:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: messages
                        type: direct
                    queue:
                        name: queue_messages

        routing:
            # Route your messages to the transports
             'App\Message\Transaction': amqp

, и я хотел бы добавить что-то вроде:

        routing:
            # Route your messages to the transports
             amqp: 'App\Message\Transaction'

1 Ответ

0 голосов
/ 09 апреля 2019

Райан Уивер ответил на аналогичный вопрос о слабости Symfony :

Вам потребуется специальный сериализатор для мессенджера, если сообщения не происходят из мессенджера:)

1) Вы создаете пользовательскую сериализацию (реализует SerializerInterface из Messenger) и настраиваете ее в конфигурации Messenger

2) Каким-то образом в этом сериализаторе вы берете JSON и превращаете его в какой-то объект «сообщения», который есть в вашем коде. Как вы делаете это на ваше усмотрение - вам нужно каким-то образом иметь возможность взглянуть на ваш JSON и выяснить, к какому классу сообщений он должен быть привязан. Затем вы можете создать этот объект вручную и заполнить данные или использовать сериализатор Symfony. Оберните это в Конверт, прежде чем вернуть его

3) Поскольку ваш сериализатор теперь возвращает объект «сообщение», если какой-то вид, Messenger использует свою обычную логику, чтобы найти обработчики для этого сообщения и выполнить их


Я быстро реализовал свои собственные задачи, чтобы вы соответствовали своей бизнес-логике :

1 - Создать Serializer, который реализует SerializerInterface:


   // I keeped the default serializer, and just override his decode method.

   /**
     * {@inheritdoc}
     */
    public function decode(array $encodedEnvelope): Envelope
    {
        if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
            throw new InvalidArgumentException('Encoded envelope should have at least a "body" and some "headers".');
        }

        if (empty($encodedEnvelope['headers']['action'])) {
            throw new InvalidArgumentException('Encoded envelope does not have an "action" header.');
        }

        // Call a factory to return the Message Class associate with the action
        if (!$messageClass = $this->messageFactory->getMessageClass($encodedEnvelope['headers']['action'])) {
            throw new InvalidArgumentException(sprintf('"%s" is not a valid action.', $encodedEnvelope['headers']['action']));
        }

        // ... keep the default Serializer logic

        return new Envelope($message, ...$stamps);
    }

2 - Извлечение правого Message с использованием фабрики:

class MessageFactory
{
    /**
     * @param string $action
     * @return string|null
     */
    public function getMessageClass(string $action)
    {
        switch($action){
            case ActionConstants::POST_MESSAGE :
                return PostMessage::class ;
            default:
                return null;
        }
    }
}

3) Настройте новый настраиваемый сериализатор для мессенджера:

framework:
  messenger:
    serializer: 'app.my_custom_serializer'

Я попытаюсь пойти немного дальше и найду способ «соединить» очередь напрямую, сообщу вам.

...