На основе учебника: https://www.merixstudio.com/blog/django-celery-beat/
код файла celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from Backend.settings import INSTALLED_APPS
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')
app = Celery('proj')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: INSTALLED_APPS)
файл tasks.py код
from celery.task import task
@task(name='task1')
def emailt():
print("email func invoked")
# code...
settings.py
from __future__ import absolute_import
import os
import djcelery
from celery.schedules import crontab
djcelery.setup_loader()
INSTALLED_APPS = [
'djcelery',
'django_celery_beat',
....
]
REDIS_HOST = 'localhost'
REDIS_PORT = '6379'
BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'
CELERY_RESULT_BACKEND = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TIMEZONE = 'Asia/Kolkata'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERYBEAT_SCHEDULE = {
'task1':
{
'task': 'proj.tasks.emailt',
'schedule': crontab(hour='*', minute='1', day_of_week='mon,tue,wed,thu,fri,sat,sun'),
}
}
В одной командной оболочке запущен сервер redis и django py manage.py runserver. В другой оболочке команда celery запускается следующим образом: celery -A proj.tasks beat -l INFO --scheduler django_celery_beat.schedulers: DatabaseScheduler
Файлы журнала означают, что работает celerybeat.
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> djcelery.loaders.DjangoLoader
. scheduler -> django_celery_beat.schedulers.DatabaseScheduler
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 seconds (5s)
[*************: INFO/MainProcess] beat: Starting...
[*************: INFO/MainProcess] Writing entries...
[*************: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[*************: INFO/MainProcess] Writing entries...
[*************: INFO/MainProcess] Writing entries...
Однако функция emailt () в tasks.py по-прежнему не вызывается. Я не могу найти проблему с сельдереем.