Как использовать rabbitMQ в качестве брокера для сельдерея в рамках Django юнит-теста - PullRequest
0 голосов
/ 13 марта 2020

Я пишу интеграционный тест, в котором я создаю контейнер rabbitMQ, используя -

docker run -d --hostname localhost -p 5672:5672 --name rabbit-tox -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq:3

Чтобы проверить, могу ли я подключиться к rabbitMQ из теста, я создал этот тест, и он может отправлять данные -

def test_rmq(self):
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672))
        channel = connection.channel()

        channel.queue_declare(queue='hello')
        channel.basic_publish(exchange='',
                              routing_key='hello',
                              body='Hello World!')
        print(" [x] Sent 'Hello World!'")
        connection.close()

Теперь я хочу использовать контейнер rabbitMQ в качестве бэкенда сельдерея, это код, который я использую -

from celery import Celery
broker_url = 'amqp://guest:guest@localhost:5672//'
app = Celery('test', broker=broker_url, backend='amqp')

from celery.contrib.testing.worker import start_worker
from swordfish_app import tasks


# Testing the consumer logic
class ServiceAccountCeleryTestCase(TransactionTestCase):

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls.celery_worker = start_worker(app)
        cls.celery_worker.__enter__()

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()
        cls.celery_worker.__exit__(None, None, None)

    def setUp(self):
        super().setUp()
        self.task = tasks.delete_all_service_accounts_when_user_inactive()
        fake_obj_meta_del = V1ObjectMeta(self_link="deleted_service_account_link")
        self.delete_namespaced_service_account_fake_data = V1Status(metadata=fake_obj_meta_del)
        self.results = self.task.get()

    @patch('kubernetes.client.CoreV1Api.delete_namespaced_service_account')
    @patch('app.k8s.serviceaccount.get_inactive_serviceaccounts')
    def test_delete_all_service_accounts_when_user_inactive(self, k8s_get_inactive_patch, k8s_del_sa_patch):
        k8s_get_inactive_patch.return_value = ["sf-user-1", "sf-user-2"]
        k8s_del_sa_patch.return_value = self.delete_namespaced_service_account_fake_data
        assert self.task.state == "SUCCESS"

Когда я выполняю тест, я нахожу эту ошибку -

Creating test database for alias 'default'...
System check identified no issues (0 silenced).
...... [x] Sent 'Hello World!'
.E
======================================================================
ERROR: setUpClass (tests.test_service_accounts.ServiceAccountCeleryTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/mlokur/swordfish/src/tests/test_service_accounts.py", line 124, in setUpClass
    cls.celery_worker.__enter__()
  File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/home/mlokur/venv/lib/python3.7/site-packages/celery/contrib/testing/worker.py", line 78, in start_worker
    **kwargs) as worker:
  File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/home/mlokur/venv/lib/python3.7/site-packages/celery/contrib/testing/worker.py", line 103, in _start_worker_thread
    assert 'celery.ping' in app.tasks
AssertionError

----------------------------------------------------------------------
Ran 7 tests in 1.842s

FAILED (errors=1)
Destroying test database for alias 'default'...

Я написал отдельный файл python для подключения и проверки -

from celery import Celery
import urllib.request
import os

# Where the downloaded files will be stored
BASEDIR="/home/celery/downloadedFiles"

# Create the app and set the broker location (RabbitMQ)

broker_url = 'amqp://guest:guest@localhost:5672//'
app = Celery('test', broker=broker_url, backend='amqp')

@app.task
def download(url, filename):
    """
    Download a page and save it to the BASEDIR directory
      url: the url to download
      filename: the filename used to save the url in BASEDIR
    """
    response = urllib.request.urlopen(url)
    data = response.read()
    with open("snap",'wb') as file:
        file.write(data)
    file.close()

, и этот пример задачи сельдерея правильно подключается к rabbitMQ и затем работает. Любая помощь приветствуется. Спасибо.

...