Как заставить работника Celery использовать «внешнюю» очередь RabbitMQ? - PullRequest
0 голосов
/ 18 сентября 2018

У меня есть следующие сценарии:

celery_tasks.py

from celery import Celery
app = Celery(broker='amqp://guest:guest@localhost:5672//')
app.conf.task_default_queue = 'test_queue'

@app.task(acks_late=True)
def test(a):
   return a

publish.py

from celery_tasks import test
test.delay('abc')

Когда я запускаю publish.py и запускаю работника (celery -a работник celery_tasks --loglevel = DEBUG), контент 'abc' публикуется в 'test_queue' и используется работником.

Есть ли у работника способ извлечь что-то из очередичто не было размещено в сельдерее?Например, когда я помещаю что-то в test_queue прямо через RabbitMQ, не проходя через издателя Celery, и запускаю работника Celery, он дает мне следующее предупреждение:

WARNING / MainProcess] Получено и удаленонеизвестное сообщениеНеправильное назначение?!?

Полное содержимое тела сообщения было следующим: body: 'abc' (3b)

{content_type: None content_encoding: None delivery_info: {'exchange': '', 'redelivered': False, 'delivery_tag': 1, 'consumer_tag': 'None2', 'routing_key': 'test_queue'} headers = {}}

Есть ли способ решить эту проблему

1 Ответ

0 голосов
/ 22 сентября 2018

Сельдерей имеет определенный формат и набор заголовков, которые необходимо поддерживать, чтобы соответствовать ему.Поэтому вам придется перепроектировать его, чтобы сделать совместимое с сельдереем сообщение, не производимое сельдереем.Имейте в виду, что на самом деле celery не предназначен для отправки сообщений через брокера, а для отправки задач, которые являются расширенными сообщениями, поэтому имеют дополнительные функции в заголовочной части сообщения amqp

...