Сельдерей с непрерывным развертыванием - PullRequest
0 голосов
/ 03 октября 2018

У меня есть сервис, который предоставляет API, который затем передает задачи, он реализован с помощью Falcon (API) и Celery (управление задачами).

В частности, мои работники загружают много времени и их кодЭто выглядит примерно так:

class HeavyOp(celery.Task):
    def __init__(self):
        self._asset = get_heavy_asset()  # <-- takes long time

    @property
    def asset(self):
        return self._asset

@app.task(base=HeavyOp)
def my_task(data):

    return my_task.asset.do_something(data)

На самом деле происходит то, что в функции __init__ некоторый объект читается с диска и сохраняется в памяти до тех пор, пока работник живет.

Иногда я хочу обновить этот объект.

Есть ли способ перезагрузить работника без простоев?Поскольку все это связано с API, я не хотел бы, чтобы эти несколько минут загрузки тяжелого объекта были простои.

Мы можем предположить, что хост имеет более 1 ядра, но решение должно быть однимхост-решение.

1 Ответ

0 голосов
/ 15 октября 2018

Не думаю, что вам нужен пользовательский базовый класс задач.То, чего вы хотите добиться, - это класс активов одного экземпляра, который загружается после инициализации работника, и вы можете перезагрузить его из задачи.

Этот подход работает:

# worker.py
import os
import sys
import time

from celery import Celery
from celery.signals import worker_ready


app = Celery(include=('tasks',))

class Asset:

    def __init__(self):
        self.time = time.time()



class AssetLoader:
    __shared_state = {}

    def __init__(self):
        self.__dict__ = self.__shared_state
        if '_value' not in self.__dict__:
            self.get_heavy_asset()

    def get_heavy_asset(self):
        self._value = Asset()

    @property
    def value(self):
        return self._value


@worker_ready.connect
def after_worker_ready(sender, **kwargs):
    AssetLoader()

Здесь я сделалAssetLoader - класс Borg, но вы можете выбрать любой другой шаблон / стратегию для совместного использования одного экземпляра Asset.Для наглядности я просто фиксирую временную метку при выполнении get_heavy_asset.

# tasks.py
from worker import app, AssetLoader


@app.task(bind=True)
def load(self):
    AssetLoader().get_heavy_asset()
    return AssetLoader().value.time

@app.task(bind=True)
def my_task(self):
    return AssetLoader().value.time

. Имейте в виду, что Актив распределяется по рабочему процессу, но не по рабочим.Если вы запускаете с concurrency=1, это не имеет значения, но для всего остального это делает.Но из того, что я понял в вашем случае использования, все должно быть в порядке в любом случае.

...