Я пишу интеграционный тест, в котором я создаю контейнер rabbitMQ, используя -
docker run -d --hostname localhost -p 5672:5672 --name rabbit-tox -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq:3
Чтобы проверить, могу ли я подключиться к rabbitMQ из теста, я создал этот тест, и он может отправлять данные -
def test_rmq(self):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
Теперь я хочу использовать контейнер rabbitMQ в качестве бэкенда сельдерея, это код, который я использую -
from celery import Celery
broker_url = 'amqp://guest:guest@localhost:5672//'
app = Celery('test', broker=broker_url, backend='amqp')
from celery.contrib.testing.worker import start_worker
from swordfish_app import tasks
# Testing the consumer logic
class ServiceAccountCeleryTestCase(TransactionTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.celery_worker = start_worker(app)
cls.celery_worker.__enter__()
@classmethod
def tearDownClass(cls):
super().tearDownClass()
cls.celery_worker.__exit__(None, None, None)
def setUp(self):
super().setUp()
self.task = tasks.delete_all_service_accounts_when_user_inactive()
fake_obj_meta_del = V1ObjectMeta(self_link="deleted_service_account_link")
self.delete_namespaced_service_account_fake_data = V1Status(metadata=fake_obj_meta_del)
self.results = self.task.get()
@patch('kubernetes.client.CoreV1Api.delete_namespaced_service_account')
@patch('app.k8s.serviceaccount.get_inactive_serviceaccounts')
def test_delete_all_service_accounts_when_user_inactive(self, k8s_get_inactive_patch, k8s_del_sa_patch):
k8s_get_inactive_patch.return_value = ["sf-user-1", "sf-user-2"]
k8s_del_sa_patch.return_value = self.delete_namespaced_service_account_fake_data
assert self.task.state == "SUCCESS"
Когда я выполняю тест, я нахожу эту ошибку -
Creating test database for alias 'default'...
System check identified no issues (0 silenced).
...... [x] Sent 'Hello World!'
.E
======================================================================
ERROR: setUpClass (tests.test_service_accounts.ServiceAccountCeleryTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/mlokur/swordfish/src/tests/test_service_accounts.py", line 124, in setUpClass
cls.celery_worker.__enter__()
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/home/mlokur/venv/lib/python3.7/site-packages/celery/contrib/testing/worker.py", line 78, in start_worker
**kwargs) as worker:
File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/home/mlokur/venv/lib/python3.7/site-packages/celery/contrib/testing/worker.py", line 103, in _start_worker_thread
assert 'celery.ping' in app.tasks
AssertionError
----------------------------------------------------------------------
Ran 7 tests in 1.842s
FAILED (errors=1)
Destroying test database for alias 'default'...
Я написал отдельный файл python для подключения и проверки -
from celery import Celery
import urllib.request
import os
# Where the downloaded files will be stored
BASEDIR="/home/celery/downloadedFiles"
# Create the app and set the broker location (RabbitMQ)
broker_url = 'amqp://guest:guest@localhost:5672//'
app = Celery('test', broker=broker_url, backend='amqp')
@app.task
def download(url, filename):
"""
Download a page and save it to the BASEDIR directory
url: the url to download
filename: the filename used to save the url in BASEDIR
"""
response = urllib.request.urlopen(url)
data = response.read()
with open("snap",'wb') as file:
file.write(data)
file.close()
, и этот пример задачи сельдерея правильно подключается к rabbitMQ и затем работает. Любая помощь приветствуется. Спасибо.