У меня небольшая концептуальная проблема с установкой задачи Celery на основе классов (просто быстрое замечание, это отличается, поскольку я не унаследован от класса celery.Task
). Связанная задача приложения является просто методом класса.
В моем celery.py file
есть следующее:
import os
import celery
from celery.schedules import crontab
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Wave.settings')
app = celery.Celery('DjangoProject', include=['amq.v1.TaskRunner.demand', 'amq.v1.TaskRunner.periodic'])
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
crontab(minute='1,11,21,31,41,51', hour='*', day_of_week='*', day_of_month='*'),
APIMiddelwareTasks().dispatch_bookings_fetch.s(),
)
class APIMiddelwareTasks():
def get_middleware_class(self, *args, **kwargs):
from django.conf import settings
return MiddlewareAPIClass(
headers={'Content-Type': 'application/soap+xml; charset=UTF-8'},
config=settings.middleware_config,
debug=False
)
@app.task(bind=True)
def dispatch_bookings_fetch(self, *args, **kwargs):
# This is the main call for the MiddlewareAPI wrapper class. The config in settings.py contains the parameters from both the sandbox and production workstations.
# Setting sandbox to True will query the sandbox workstation, and setting debug to True will query known session dates and times.
middleware = self.get_middleware_class()
...
Однако приведенное выше, похоже, приводит к следующей ошибке:
AttributeError: 'dispatch_surfers_bookings' object has no attribute 'get_middleware_class'
Кто-нибудь знает лучший способ создать класс, основанный на нескольких задачах приложения, с возможностью ссылки на различные self.methods()
в рамках этой задачи? Должен ли я просто отключить это и связать задачу отдельно от класса?