Сельдерей не публикует сообщение в очередь RabbitMQ без заданий - PullRequest
0 голосов
/ 17 февраля 2020

Я пытаюсь подключить серверную часть Django Rest Framework к RabbitMQ с помощью Celery. У меня есть несколько микро-сервисов, которые используют RabbitMQ в качестве шины сообщений между бэкэндом и этими сервисами. Когда у меня есть бэкэнд-вызов задачи, которая отправляет сообщение в шину сообщений микро-сервисов, сообщение никогда не помещается в очередь для захвата одной из этих сервисов. Очереди, которые я объявляю в файле tasks.py, создаются в RabbitMQ, но не используются ни одним из производителей.

testcelery / celery.py

...
from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "testcelery.settings")
app = Celery("testcelery")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

testcelery / settings.py

...

installed_apps = [
...
"backend_processing"
]

CELERY_BROKER_URL = "amqp://testuser@rabbitmq_host:5672"
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = "json"

backend_processing / tasks.py

...
from celery import shared_task
from testcelery.celery import app as celery_app
from kombu import Exchange, Queue

test_queue = Queue("test_queue", Exchange("test"), "test", durable=True)

@shared_task
def run_processing(data, producer=None):
    with celery_app.producer_or_acquire(producer) as producer:
        producer.publish(data,
                         declare=[test_queue],
                         retry=True,
                         exchange=test_queue.exchange,
                         routing_key=test_queue.routing_key,
                         delivery_mode="persistent")

Что мне нужно изменить в моей конфигурации разрешить сельдерею публиковать sh в очередях, отличных от тех, которые передаются сельдерею с помощью settings.py?

...