Тестирование Celery Beat - PullRequest
       8

Тестирование Celery Beat

1 голос
/ 14 апреля 2019

Я работаю над заданием на сельдерей в рамках проекта django, который периодически создает записи в базе данных.Я знаю, потому что, когда я ставлю задачу так:

celery.py:

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery
from celery.schedules import crontab

app = Celery("clock-backend", broker=os.environ.get("RABBITMQ_URL"))

app.config_from_object("django.conf:settings", namespace="CELERY")
app.conf.beat_schedule = {

    'create_reports_monthly': {
        'task': 'project_celery.tasks.create_reports_monthly',
        'schedule': 10.0,
    },
}
app.autodiscover_tasks()

И запускаю свой проект, он действительно создает объект каждые 10 секунд.

Но то, что я действительно хочу сделать, это настроить его запуск каждый первый день месяца.

Для этого я бы изменил "schedule": crontab(0, 0, day_of_month="1").

Вот моя настоящая проблема:Как мне проверить, что это действительно работает?

Под тестированием я подразумеваю фактические (модульные) тесты.

Я пытался работать с пакетом под названием freezegun * 1018.*.Тест с этим выглядит так:

def test_start_of_month_report_creation(self, user_object, contract_object, report_object):
    # set time to the last day of January
    with freeze_time("2019-01-31 23:59:59") as frozen_time:
        # let one second pass
        frozen_time.tick()
        # give the celery task some time
        time.sleep(20)
        # Test Logic to check whether the object was created
        # Example: assert MyModel.objects.count() > 0

Но это не сработало.Я подозреваю, что ритм сельдерея использует не время, установленное через freezgun / python, а настоящие «аппаратные» часы.

Я также пытался установить Hardwareclock как здесь , но это не такработать в моей настройке.

Я благодарен за любые комментарии, замечания или помощь по этой теме, так как я действительно хотел бы реализовать тест для этого.

...