Не думаю, что вам нужен пользовательский базовый класс задач.То, чего вы хотите добиться, - это класс активов одного экземпляра, который загружается после инициализации работника, и вы можете перезагрузить его из задачи.
Этот подход работает:
# worker.py
import os
import sys
import time
from celery import Celery
from celery.signals import worker_ready
app = Celery(include=('tasks',))
class Asset:
def __init__(self):
self.time = time.time()
class AssetLoader:
__shared_state = {}
def __init__(self):
self.__dict__ = self.__shared_state
if '_value' not in self.__dict__:
self.get_heavy_asset()
def get_heavy_asset(self):
self._value = Asset()
@property
def value(self):
return self._value
@worker_ready.connect
def after_worker_ready(sender, **kwargs):
AssetLoader()
Здесь я сделалAssetLoader - класс Borg, но вы можете выбрать любой другой шаблон / стратегию для совместного использования одного экземпляра Asset.Для наглядности я просто фиксирую временную метку при выполнении get_heavy_asset
.
# tasks.py
from worker import app, AssetLoader
@app.task(bind=True)
def load(self):
AssetLoader().get_heavy_asset()
return AssetLoader().value.time
@app.task(bind=True)
def my_task(self):
return AssetLoader().value.time
. Имейте в виду, что Актив распределяется по рабочему процессу, но не по рабочим.Если вы запускаете с concurrency=1
, это не имеет значения, но для всего остального это делает.Но из того, что я понял в вашем случае использования, все должно быть в порядке в любом случае.