безопасность потоков в django с асинхронными задачами и redis - PullRequest
4 голосов
/ 03 апреля 2011

У меня есть приложение django, которое вызывает асинхронную задачу для набора запросов (используя сельдерей).Задача принимает набор запросов и выполняет целый набор операций, которые могут потенциально занять очень много времени, основываясь на объектах в них.Объекты могут быть общими для всех наборов запросов, поэтому пользователь может отправить задачу в набор запросов, который содержит объекты, которые уже запущены, и эта новая задача должна выполняться только на тех объектах, которые еще не запущены, но ждать завершения всех объектов.прежде чем он вернется.

Мое объяснение немного сбивает с толку, поэтому представьте следующий код:

from time import sleep
import redis
from celery.task import Task
from someapp.models import InterestingModel
from someapp.longtime import i_take_a_while

class LongRunningTask(Task):
    def run(self, process_id, *args, **kwargs):
        _queryset = InterestingModel.objects.filter(process__id=process_id)

        r = redis.Redis()
        p = r.pipeline()
        run_check_sets = ('run_check', 'objects_already_running')

        # There must be a better way to do this:
        for o in _queryset.values_list('pk', flat=True):
            p.sadd('run_check')
        p.sdiff(run_check_sets) # Objects that need to be run
        p.sunion(run_check_sets) # Objects that we need to wait for
        p.sunionstore('objects_already_running',run_check_sets)
        p.delete('run_check')
        redis_result = p.execute()

        objects_to_run = redis_result[-3]
        objects_to_wait_for = redis_result[-2]

        if objects_to_run:
            i_take_a_while(objects_to_run)
            p = r.pipeline()
            for o in objects_to_run:
                p.srem('objects_already_running', o)
            p.execute()

        while objects_to_wait_for:
            p = r.pipeline()
            for o in objects_to_wait_for:
                p.sismember('objects_already_running',o)
            redis_result = p.execute()
            objects_to_wait_for = [objects_to_wait_for[i] for i, member in enumerate(redis_result) if member]
            # Probably need to add some sort of timeout here or in redis
            sleep(30) 

Я чрезвычайно новичок в Redis, поэтому мой главный вопрос в том, существует ли более эффективный способ манипулирования Redisдобиться того же результата.В более широком смысле, мне интересно, нужен ли Redis / правильный подход к решению этой проблемы.Кажется, должен быть лучший способ взаимодействия моделей Django с Redis.Наконец, мне интересно, является ли этот код потокобезопасным.Может ли кто-нибудь пробить дыры в моей логике?

Любые комментарии приветствуются.

1 Ответ

2 голосов
/ 26 апреля 2011

Возможно ли для вас это немного по-другому? В частности, я бы запускал задачи для каждого объекта и затем сохранял где-нибудь информацию о ваших долго выполняющихся заданиях (например, базу данных, кэш и т. Д.). Когда каждый отдельный объект был завершен, он обновлял информацию о продолжительном задании и проверял, все ли задания возвращены. Если это так, то вы можете запустить любой код, который нужно запустить, когда завершится долгосрочное задание.

Преимущество этого состоит в том, что вы не привязываете нить на своем сервере, пока вы ждете, пока не произойдут другие события. На стороне клиента вы могли бы периодически проверять состояние долго выполняющегося задания и даже использовать количество завершенных объектов для обновления индикатора выполнения, если хотите.

...