Как добавить очередь к пользовательскому получателю сообщений во время выполнения - PullRequest
0 голосов
/ 13 февраля 2019

У меня есть сценарий использования, когда моему приложению Django необходимо обрабатывать сообщения из внешнего источника.Celery поддерживает это с Пользовательскими пользователями сообщений , и эта часть работает нормально.

Тип обмена headers, поэтому сообщение может направляться в разные очереди в зависимости от условий.Условия настраиваются через администратора Django.Из приведенного ниже кода вы можете видеть, например, что QueueItem может быть активирован / деактивирован в любое время.

Это работает при первой загрузке работника.На новом Кролике все обмены, очереди и маршруты создаются, и пользовательский потребитель правильно использует сообщения.

Проблема заключается в том, что пользователь добавляет новый элемент QueueItem или отображает активный статус существующего элемента QueueItem.Я могу динамически создавать новую очередь, но как бы я ни старался, пользовательский потребитель не потребляет ее.Я хотел бы, чтобы приложение:

  1. Перезапускало пользовательского потребителя (я пытался app.control.broadcast('pool_restart'))
  2. Добавить очереди и использовать его, используя Custom Consumer.(Я попытался app.control.add_consumer(...))
  3. Изящно перезапустите всех рабочих, которые, когда я вручную работаю, чтобы подобрать новые настройки.
class MyConsumerStep(bootsteps.ConsumerStep):
    alias = 'MyConsumerStep'

    def get_queues(self):
        from app.models import QueueItem
        qs = QueueItem.objects.filter(active=True, mode=QueueItem.MODE_ASYNC, direction=QueueItem.DIRECTION_INBOUND)
        return [Queue(q.name, Exchange('qbpubsub', 'headers'),
                      binding_arguments={
                          'var1': q.field1,
                          'var2': q.field2,
                          'x-match': 'any'
                      }) for q in qs]

    def get_consumers(self, channel):
        return [Consumer(channel,
                         queues=self.service_queues,
                         on_message=self.on_message,
                         accept=['json'])]

    def on_message(self, message):
        payload = message.decode()
        print('Received message: {0!r} {props!r} rawlen={s}'.format(
            payload, props=message.properties, s=len(message.body)
        ))
        message.ack()

Я успешно создалновая очередьЯ успешно прикрепил его к потребителю сельдерея по умолчанию.Я хотел бы как-то присоединить его к пользовательскому потребителю, чтобы обратный вызов on_message мог его обработать.

Мне кажется, что я довольно близок, но мне просто не хватает одной небольшой очевидной части API Celery / Kombu.

...