Как управлять заданиями в сельдерее? - PullRequest
0 голосов
/ 16 июля 2011

У меня есть задача с определенным параметром, и я хочу знать, существует ли задача с таким же параметром. У меня есть следующее:

@task
def some_task(id):
    some_task.update_state(state="PROGRESS", meta={"id": id})
    some_action_by_id(id)

Но мне бы хотелось:

@task
def some_task(id):
    if !check_task(id):
    some_task.update_state(state="PROGRESS", meta={"id": id})
    some_action_by_id(id)   

Как мне это сделать?

Ответы [ 2 ]

0 голосов
/ 10 июня 2013

вот мое решение:

from celery.task.control import inspect
from celery.result import AsyncResult

def get_same_task(aTaskName, aArgs, aHosts):
    for jobs in aHosts.values():
        for job in jobs:
            if job['name'] == aTaskName and job['args']  == str(aArgs):
                return job['id']
    return None

class IgnoreSameArgumentsTask(Task):
    abstract = True
    inspect = inspect()

    def delay(self,  *args, **kwargs):
        vHosts_Jobs   = self.inspect.active()
        vTaskId = get_same_task(self.name, args, vHosts_Jobs)
        if vTaskId != None:
            return AsyncResult(vTaskId)
        else:
            return super(IgnoreSameArgumentsTask, self).delay(*args,  **kwargs)

@celery.task(base=IgnoreSameArgumentsTask)
def add(x, y):
    sleep(x+y)
    return x + y
0 голосов
/ 17 июля 2011

Вы, вероятно, имеете в виду, что вы хотите, чтобы только один "some_task" выполнялся для каждого уникального идентификатора в каждый момент. Таким образом, вы должны реализовать механизм блокировки. Взгляните здесь . Сельдерей отлично играет с Redis!

...