Я никогда раньше не использовал Celery и пытаюсь правильно его настроить.Я использую Redis в качестве брокера и хостинг на Heroku.Я впервые пытаюсь запустить асинхронные задачи, и я изо всех сил.У меня есть команда управления, которую я хотел бы периодически запускать.
celery.py
from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
import django
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'coffee.settings')
app = Celery('coffee')
app.config_from_object('django.conf:settings', namespace = 'CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind= True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
app.conf.beat_schedule = {
'add-every-30-seconds':{
'task': 'inventory.tasks.boarshead',
'schedule' : 30.0,
'args' : ()
},
}
settings.py
CACHES = {
"default": {
"BACKEND": "redis_cache.RedisCache",
"LOCATION": os.environ.get('REDIS_URL'),
}
}
tasks.py
from celery import shared_task
import celery
import time
from django.core import management
@celery.task
def boarshead():
try:
print("in celery module")
"""Boarshead expired sessions by using Django Management Command."""
management.call_command("clearsessions", verbosity=0)
CreateBoarsHeadList.py
return "success"
except:
print(e)
init .py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
procfile
worker: celery worker --app=tasks.inventory.app