У меня есть сценарий использования, когда моему приложению Django необходимо обрабатывать сообщения из внешнего источника.Celery поддерживает это с Пользовательскими пользователями сообщений , и эта часть работает нормально.
Тип обмена headers
, поэтому сообщение может направляться в разные очереди в зависимости от условий.Условия настраиваются через администратора Django.Из приведенного ниже кода вы можете видеть, например, что QueueItem может быть активирован / деактивирован в любое время.
Это работает при первой загрузке работника.На новом Кролике все обмены, очереди и маршруты создаются, и пользовательский потребитель правильно использует сообщения.
Проблема заключается в том, что пользователь добавляет новый элемент QueueItem или отображает активный статус существующего элемента QueueItem.Я могу динамически создавать новую очередь, но как бы я ни старался, пользовательский потребитель не потребляет ее.Я хотел бы, чтобы приложение:
- Перезапускало пользовательского потребителя (я пытался
app.control.broadcast('pool_restart')
) - Добавить очереди и использовать его, используя Custom Consumer.(Я попытался
app.control.add_consumer(...)
) - Изящно перезапустите всех рабочих, которые, когда я вручную работаю, чтобы подобрать новые настройки.
class MyConsumerStep(bootsteps.ConsumerStep):
alias = 'MyConsumerStep'
def get_queues(self):
from app.models import QueueItem
qs = QueueItem.objects.filter(active=True, mode=QueueItem.MODE_ASYNC, direction=QueueItem.DIRECTION_INBOUND)
return [Queue(q.name, Exchange('qbpubsub', 'headers'),
binding_arguments={
'var1': q.field1,
'var2': q.field2,
'x-match': 'any'
}) for q in qs]
def get_consumers(self, channel):
return [Consumer(channel,
queues=self.service_queues,
on_message=self.on_message,
accept=['json'])]
def on_message(self, message):
payload = message.decode()
print('Received message: {0!r} {props!r} rawlen={s}'.format(
payload, props=message.properties, s=len(message.body)
))
message.ack()
Я успешно создалновая очередьЯ успешно прикрепил его к потребителю сельдерея по умолчанию.Я хотел бы как-то присоединить его к пользовательскому потребителю, чтобы обратный вызов on_message
мог его обработать.
Мне кажется, что я довольно близок, но мне просто не хватает одной небольшой очевидной части API Celery / Kombu.