Демон управления задачами - PullRequest
       5

Демон управления задачами

2 голосов
/ 10 сентября 2011

Мне нужно выполнить несколько длительных (я думаю, 2-3 дня) задач с моими данными Django ORM. Я смотрю вокруг и не нашел хороших решений.

django-tasks - http://code.google.com/p/django-tasks/ недостаточно хорошо документировано, и у меня нет идей, как его использовать.

сельдерей - http://ask.github.com/celery/ является чрезмерным для моих задач. Хорошо ли это для долгосрочных задач?

Итак, что мне нужно сделать, я просто получить все данные или части данных из моей базы данных, например:

Entry.objects.all()

А затем мне нужно выполнить одну и ту же функцию для каждого из QuerySet.

Я думаю, что это должно работать около 2-3 дней.

Так, может быть, кто-нибудь объяснит мне, как его построить.

P.S: на данный момент у меня есть только одна идея, использовать cron и базу данных для хранения временной шкалы выполнения процесса.

1 Ответ

1 голос
/ 11 сентября 2011

Используйте подзадачи Celery. Это позволит вам запустить долгосрочное задание (с множеством кратковременных подзадач под ним) и сохранить хорошие данные о состоянии выполнения в хранилище результатов задания Celery. В качестве дополнительного бонуса подзадачи будут распределены по рабочим процессам, что позволит вам в полной мере использовать преимущества многоядерных серверов или даже нескольких серверов, чтобы сократить время выполнения задачи.

РЕДАКТИРОВАТЬ: пример:

import time, logging as log
from celery.task import task
from celery.task.sets import TaskSet
from app import Entry

@task(send_error_emails=True)
def long_running_analysis():
    entries = list(Entry.objects.all().values('id'))
    num_entries = len(entries)
    taskset = TaskSet(analyse_entry.subtask(entry.id) for entry in entries)
    results = taskset.apply_async()
    while not results.ready()
        time.sleep(10000)
        print log.info("long_running_analysis is %d% complete",
                       completed_count()*100/num_entries)
    if results.failed():
        log.error("Analysis Failed!")
    result_set = results.join() # brings back results in 
                                # the order of entries
    #perform collating or count or percentage calculations here
    log.error("Analysis Complete!")

@task
def analyse_entry(id): # inputs must be serialisable
    logger = analyse_entry.get_logger()
    entry = Entry.objects.get(id=id)
    try:
        analysis = entry.analyse()
        logger.info("'%s' found to be %s.", entry, analysis['status'])
        return analysis # must be a dict or serialisable.
    except Exception as e:
        logger.error("Could not process '%s': %s", entry, e)
        return None 

Если ваши расчеты нельзя разделить на задачи для каждой записи, вы всегда можете настроить его так, чтобы одна подзадача выполняла подсчеты, одна подзадача выполняла другой тип анализа. и это все еще будет работать, и все же позволит вам выиграть от пареллеизма.

...