Задачи Celery, не принимающие локальное git-репо, изменили код для выполнения - PullRequest
0 голосов
/ 11 сентября 2018

Я использую задачи сельдерея для реализации кода, который забирает код git для выполнения. Задача получает код из git-репо (ветка Master), клонирует его на машину, на которой работает сельдерей, и выполняет код. Если на компьютере присутствует локальная копия кода, то задача выполняет локальную копию или, если в основной ветви есть какие-либо изменения, она извлекает изменения и затем выполняет их.

Проблема, с которой я сталкиваюсь в настоящее время, заключается в том, что, если я делаю изменения в репо локально и выполняю задачу, он не воспринимает изменения, сделанные локально. Если я только потом перезапущу сельдерей, я могу выполнить локально модифицированный код.

Мои настройки сельдерея.

celery.py

"""Celery entrypoint"""

from celery import Celery
from waves.wavequeue import celery_config

app = Celery()
app.config_from_object(celery_config)

celery_config.py

"""Celery configuration"""

broker_url = 'redis://localhost'
result_backend = 'redis://localhost'

include = ['waves.core.execute', 'waves.waveengine.tasks']
worker_redirect_stdouts = False
result_expires = 3600
worker_hijack_root_logger = True

# Fix Hard time limit (300.0s) exceeded for task
task_time_limit = 86400  # 1 day
task_soft_time_limit = 86400  # 1 day
task_serializer = 'json'

Я определил пользовательское задание, как показано ниже:

class WaveTask(celery.Task):

    @abstractmethod
    def run(self, *args, **kwargs):
        pass

    def on_success(self, retval, task_id, args, kwargs):
        logger.info('on_success task_id=%s retval=%s args=%s kwargs=%s',   task_id, retval, args, kwargs)

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.error('on_retry task_id=%s exc=%s args=%s kwargs=%s einfo=%s', task_id, exc, args, kwargs, einfo)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.error('on_failure task_id=%s exc=%s args=%s kwargs=%s einfo=%s', task_id, exc, args, kwargs, einfo)

waveengine / tasks.py

from waves.waveengine.wave_engine import run_wavelet
from waves.wavequeue.celery import app
from waves.wavequeue.wave_task import WaveTask


@app.task(base=WaveTask)
def run_wavelet_async(instance_id, wavelet_id, last_wavelet_id, wavelet: dict) -> bool:
return run_wavelet(instance_id, wavelet_id, last_wavelet_id, wavelet)


@app.task(base=WaveTask)
def run_wavelet_sync(instance_id, wavelet_id, last_wavelet_id, wavelet: dict) -> bool:
return run_wavelet(instance_id, wavelet_id, last_wavelet_id, wavelet)

Вышеуказанные задачи внутренне запускают функцию run_wavelet (), которая выполняет подобранный код.

Любая помощь будет полезна в устранении вышеуказанной проблемы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...