Я устанавливаю задачу в Celery, чтобы она «потребляла» определенную тему обмена. Когда я отправляю сообщение бирже, о которой идет речь, я получаю сообщение об ошибке: «Получено и удалено неизвестное сообщение. Неправильный адресат?!?» на консоли сельдерея.
Я создал отдельную папку проекта, чтобы повторить проблему, где все называется test-что-то со следующей структурой:
celery-test/
L celery.py
L celeryconfig.py
L tasks.py
Я видел различные вопросы о StackOverflow и GitHub, относящиеся к пакету librabbitmq. Решением здесь было бы удалить этот пакет, но я даже не установил его, так что это ни к чему не привело. Некоторые из найденных вопросов / проблем, которые предлагают это решение:
- https://github.com/celery/celery/issues/3675
- Celery & Rabbitmq: WARNING / MainProcess] Получено и удалено неизвестное сообщение. Неправильный пункт назначения?!? - эксперимент на GIT
Я также пытался поиграть с настройками маршрутизации задач, так как полагаю, что проблема лежит там, но я не могу заставить ее работать.
Для всех, кто интересуется, почему порт отключен на 1, это потому, что он указывает на rabbitmq в моем док-контейнере, который больше не может использовать 5672.
celery.py
app = Celery('celery_test', include=['celery_test.tasks'])
app.config_from_object('celery_test.celeryconfig')
celeryconfig.py
broker_url = 'amqp://guest:guest@localhost:5673//'
result_backend = 'rpc://'
default_exchange = Exchange('default', type='direct')
test_exchange = Exchange('test_exchange', type='topic')
task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('test_queue', test_exchange, routing_key='test123test')
)
task_routes = {
'celery_test.tasks.test_method': {
'queue': 'test_queue'
}
}
tasks.py
@app.task
def test_method():
print('test_method')
return 'test_method'
А затем файл, который я использовал для отправки сообщения: send.py
connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5673/'))
channel = connection.channel()
exchange = 'test_exchange'
routing_key = 'test123test'
message = 'Testmessage'
channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)
connection.close()