Я пытаюсь заставить функцию работать в моем проекте django с celerybeat, которая импортирует функцию на основе классов из библиотеки-оболочки.Я читал, что сельдерей не работает с классами слишком легко.моя функция login_mb не принимает аргумент, но когда я пытаюсь зарегистрироваться и вызвать эту задачу, я получаю ошибку Couldn't apply scheduled task login_mb: login_mb() takes 0 positional arguments but 1 was given
Это из-за self в импортированной функции-обертке?
Что я могу сделать, чтобы получить этоработать с сельдереем?
settings.py
CELERY_BEAT_SCHEDULE = {
'login_mb': {
'task': 'backend.tasks.login_mb',
'schedule': timedelta(minutes=30),
} ,
tasks.py
from matchbook.apiclient import APIClient
import logging
from celery import task
log = logging.getLogger(__name__)
@shared_task(bind=True)
def login_mb():
mb = APIClient('abc', '123')
mb.login()
mb.keep_alive()
apiclient.py (библиотека оболочки)
from matchbook.baseclient import BaseClient
from matchbook import endpoints
class APIClient(BaseClient):
def __init__(self, username, password=None):
super(APIClient, self).__init__(username, password)
self.login = endpoints.Login(self)
self.keep_alive = endpoints.KeepAlive(self)
self.logout = endpoints.Logout(self)
self.betting = endpoints.Betting(self)
self.account = endpoints.Account(self)
self.market_data = endpoints.MarketData(self)
self.reference_data = endpoints.ReferenceData(self)
self.reporting = endpoints.Reporting(self)
def __repr__(self):
return '<APIClient [%s]>' % self.username
def __str__(self):
return 'APIClient'