Я использую задачи сельдерея для реализации кода, который забирает код git для выполнения. Задача получает код из git-репо (ветка Master), клонирует его на машину, на которой работает сельдерей, и выполняет код. Если на компьютере присутствует локальная копия кода, то задача выполняет локальную копию или, если в основной ветви есть какие-либо изменения, она извлекает изменения и затем выполняет их.
Проблема, с которой я сталкиваюсь в настоящее время, заключается в том, что, если я делаю изменения в репо локально и выполняю задачу, он не воспринимает изменения, сделанные локально. Если я только потом перезапущу сельдерей, я могу выполнить локально модифицированный код.
Мои настройки сельдерея.
celery.py
"""Celery entrypoint"""
from celery import Celery
from waves.wavequeue import celery_config
app = Celery()
app.config_from_object(celery_config)
celery_config.py
"""Celery configuration"""
broker_url = 'redis://localhost'
result_backend = 'redis://localhost'
include = ['waves.core.execute', 'waves.waveengine.tasks']
worker_redirect_stdouts = False
result_expires = 3600
worker_hijack_root_logger = True
# Fix Hard time limit (300.0s) exceeded for task
task_time_limit = 86400 # 1 day
task_soft_time_limit = 86400 # 1 day
task_serializer = 'json'
Я определил пользовательское задание, как показано ниже:
class WaveTask(celery.Task):
@abstractmethod
def run(self, *args, **kwargs):
pass
def on_success(self, retval, task_id, args, kwargs):
logger.info('on_success task_id=%s retval=%s args=%s kwargs=%s', task_id, retval, args, kwargs)
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.error('on_retry task_id=%s exc=%s args=%s kwargs=%s einfo=%s', task_id, exc, args, kwargs, einfo)
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error('on_failure task_id=%s exc=%s args=%s kwargs=%s einfo=%s', task_id, exc, args, kwargs, einfo)
waveengine / tasks.py
from waves.waveengine.wave_engine import run_wavelet
from waves.wavequeue.celery import app
from waves.wavequeue.wave_task import WaveTask
@app.task(base=WaveTask)
def run_wavelet_async(instance_id, wavelet_id, last_wavelet_id, wavelet: dict) -> bool:
return run_wavelet(instance_id, wavelet_id, last_wavelet_id, wavelet)
@app.task(base=WaveTask)
def run_wavelet_sync(instance_id, wavelet_id, last_wavelet_id, wavelet: dict) -> bool:
return run_wavelet(instance_id, wavelet_id, last_wavelet_id, wavelet)
Вышеуказанные задачи внутренне запускают функцию run_wavelet (), которая выполняет подобранный код.
Любая помощь будет полезна в устранении вышеуказанной проблемы.