Как эффективно выполнить много задач «чуть позже» в Python? - PullRequest
20 голосов
/ 14 июля 2011

У меня есть процесс, который должен выполнить кучу действий «позже» (обычно через 10-60 секунд). Проблема в том, что эти «более поздние» действия могут быть много (1000 с), поэтому использование Thread для каждой задачи нецелесообразно. Я знаю о существовании таких инструментов, как gevent и eventlet , но одна из проблем заключается в том, что процесс использует zeromq для связи, поэтому мне потребуется некоторая интеграция ( У eventlet уже есть).

Что мне интересно, так это Какие у меня есть варианты? Итак, предложения приветствуются в отношении библиотек (если вы использовали какой-либо из перечисленных, пожалуйста, поделитесь своим опытом), методов ( Поддержка сопрограмм Python , использовать один поток, который некоторое время спит и проверяет очередь), как использовать опрос zeromq или eventloop для выполнения работы, или что-то еще.

Ответы [ 10 ]

19 голосов
/ 14 июля 2011

рассмотрите возможность использования очереди приоритетов с одним или несколькими рабочими потоками для обслуживания задач. Основной поток может добавить работу в очередь с отметкой времени, в котором она будет обслуживаться в ближайшее время. Рабочие потоки извлекают работу из очереди, спят до достижения значения приоритета, выполняют работу, а затем выталкивают другой элемент из очереди.

Как насчет более детального ответа. Маклаубер делает хорошую мысль. Если есть вероятность, что все ваши работники могут спать, когда у вас есть новая, более срочная работа, то queue.PriorityQueue на самом деле не является решением, хотя «приоритетная очередь» по-прежнему остается техникой для использования, которая доступна из heapq модуль. Вместо этого мы будем использовать другой примитив синхронизации; переменная условия, которая в python пишется threading.Condition.

Подход довольно прост, загляните в кучу, и если работа текущая, вытолкните ее и сделайте эту работу. Если была работа, но она запланирована на будущее, просто подожди до тех пор, пока работа не закончится, или, если работы вообще не будет, спи навсегда.

продюсер делает свою долю работы; каждый раз, когда он добавляет новую работу, он уведомляет об этом условии, поэтому, если есть спящие рабочие, они просыпаются и перепроверяют очередь на новую работу.

import heapq, time, threading

START_TIME = time.time()
SERIALIZE_STDOUT = threading.Lock()
def consumer(message):
    """the actual work function.  nevermind the locks here, this just keeps
       the output nicely formatted.  a real work function probably won't need
       it, or might need quite different synchronization"""
    SERIALIZE_STDOUT.acquire()
    print time.time() - START_TIME, message
    SERIALIZE_STDOUT.release()

def produce(work_queue, condition, timeout, message):
    """called to put a single item onto the work queue."""
    prio = time.time() + float(timeout)
    condition.acquire()
    heapq.heappush(work_queue, (prio, message))
    condition.notify()
    condition.release()

def worker(work_queue, condition):
    condition.acquire()
    stopped = False
    while not stopped:
        now = time.time()
        if work_queue:
            prio, data = work_queue[0]
            if data == 'stop':
                stopped = True
                continue
            if prio < now:
                heapq.heappop(work_queue)
                condition.release()
                # do some work!
                consumer(data)
                condition.acquire()
            else:
                condition.wait(prio - now)
        else:
            # the queue is empty, wait until notified
            condition.wait()
    condition.release()

if __name__ == '__main__':
    # first set up the work queue and worker pool
    work_queue = []
    cond = threading.Condition()
    pool = [threading.Thread(target=worker, args=(work_queue, cond))
            for _ignored in range(4)]
    map(threading.Thread.start, pool)

    # now add some work
    produce(work_queue, cond, 10, 'Grumpy')
    produce(work_queue, cond, 10, 'Sneezy')
    produce(work_queue, cond, 5, 'Happy')
    produce(work_queue, cond, 10, 'Dopey')
    produce(work_queue, cond, 15, 'Bashful')
    time.sleep(5)
    produce(work_queue, cond, 5, 'Sleepy')
    produce(work_queue, cond, 10, 'Doc')

    # and just to make the example a bit more friendly, tell the threads to stop after all
    # the work is done
    produce(work_queue, cond, float('inf'), 'stop')
    map(threading.Thread.join, pool)
11 голосов
/ 19 июля 2011

У этого ответа на самом деле есть два предложения - мое первое и другое, которое я обнаружил после первого.

sched

Я подозреваю, что вы ищете модуль sched.

РЕДАКТИРОВАТЬ : мое голое предложение казалось мало полезным после того, как я его прочитал.Поэтому я решил протестировать модуль sched, чтобы увидеть, может ли он работать, как я предложил.Вот мой тест: я бы использовал его с единственным потоком, более или менее таким образом:

class SchedulingThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

    def run(self):
        self.scheduler.run()

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

Во-первых, я бы создал класс потока, который имел бы свой собственный планировщик и очередь.Как минимум одно событие будет зарегистрировано в планировщике: одно для вызова метода для планирования событий из очереди.

class SchedulingThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

Метод для планирования событий из очереди будет блокировать очередь, планировать каждое событие, пустойочередь и расписание снова, чтобы искать новые события когда-нибудь в будущем.Обратите внимание, что период поиска новых событий короткий (одна секунда), вы можете изменить его:

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

В классе также должен быть метод для планирования пользовательских событий.Естественно, этот метод должен блокировать очередь при ее обновлении:

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

Наконец, класс должен вызвать основной метод планировщика:

    def run(self):
        self.scheduler.run()

Вот пример использования:

def print_time():
    print "scheduled:", time.time()


if __name__ == "__main__":
    st = SchedulingThread()
    st.start()          
    st.schedule(print_time, 10)

    while True:
        print "main thread:", time.time()
        time.sleep(5)

    st.join()

Его вывод на моей машине:

$ python schedthread.py
main thread: 1311089765.77
Registerd event (10, 1, <function print_time at 0x2f4bb0>, ())
main thread: 1311089770.77
main thread: 1311089775.77
scheduled: 1311089776.77
main thread: 1311089780.77
main thread: 1311089785.77

Этот код - просто быстрый и грязный пример, может потребоваться некоторая работа.Тем не менее, я должен признаться, что я немного очарован модулем sched, поэтому я предложил это.Возможно, вы захотите найти и другие предложения:)

APScheduler

Глядя в Google на решения, подобные тому, что я опубликовал, я нашел этот потрясающий модуль APScheduler ,Это настолько практично и полезно, что могу поспорить, что - ваше решение.Мой предыдущий пример был бы намного проще с этим модулем:

from apscheduler.scheduler import Scheduler
import time

sch = Scheduler()
sch.start()

@sch.interval_schedule(seconds=10)

def print_time():
    print "scheduled:", time.time()
    sch.unschedule_func(print_time)

while True:
    print "main thread:", time.time()
    time.sleep(5)

(К сожалению, я не нашел, как запланировать выполнение события только один раз, поэтому событие функции должно быть отменено по расписанию. Бьюсь об заклад, это можно решитьс каким-то декоратором.)

7 голосов
/ 26 июля 2011

Если у вас есть куча задач, которые нужно выполнить позже, и вы хотите, чтобы они сохранялись, даже если вы закрываете вызывающую программу или своих работников, вам действительно стоит заглянуть в Celery , который делает Это очень легко создавать новые задачи, запускать их на любой машине и ждать результатов.

На странице "Сельдерей" это простая задача с добавлением двух чисел: "

from celery.task import task

@task
def add(x, y):
    return x + y

Вы можете выполнить задачу в фоновом режиме или дождаться ее завершения:

>>> result = add.delay(8, 8)
>>> result.wait() # wait for and return the result
16
3 голосов
/ 26 июля 2011

Вы писали:

Одна из проблем заключается в том, что процесс использует zeromq для связи, поэтому мне потребуется некоторая интеграция (она уже есть в eventlet)

Похожена ваш выбор будут сильно влиять эти детали, которые немного неясны: как zeromq используется для связи, сколько ресурсов потребуется для интеграции, и каковы ваши требования и доступные ресурсы.


Существует проект под названием django-ztask , который использует zeromq и предоставляет декоратор task, аналогичный сельдерею.Тем не менее, он (очевидно) специфичен для Django и поэтому может не подойти в вашем случае.Я не использовал его, предпочитаю сельдерей сам.

Я использовал сельдерей для нескольких проектов (они размещены на ep.io PaaS-хостинг, который предоставляет простой способ его использования).

Celery выглядит как довольно гибкое решение, позволяющее задерживать задачи, выполнять обратные вызовы, истечение срока и повторное выполнение задачи, ограничивать скорость выполнения задачи и т. Д. Может использоваться сRedis, Beanstalk, CouchDB, MongoDB или база данных SQL.

Пример кода (определение задачи и асинхронное выполнение после задержки):

from celery.decorators import task

@task
def my_task(arg1, arg2):
    pass # Do something

result = my_task.apply_async(
    args=[sth1, sth2], # Arguments that will be passed to `my_task()` function.
    countdown=3, # Time in seconds to wait before queueing the task.
)

См. Также раздел в сельдереедокументы .

2 голосов
/ 20 июля 2011

Вы смотрели на модуль multiprocessing?Стандартно поставляется с Python.Он похож на модуль threading, но запускает каждую задачу в процессе.Вы можете использовать объект Pool() для настройки рабочего пула, а затем использовать метод .map() для вызова функции с различными аргументами задачи в очереди.

1 голос
/ 24 июля 2011

Pyzmq имеет реализацию ioloop с API-интерфейсом, аналогичным ioloop торнадо. Он реализует DelayedCallback, который может вам помочь.

0 голосов
/ 26 июля 2011

Simple. Вы можете унаследовать свой класс от Thread и создать экземпляр своего класса с параметром timem, таким образом, для каждого экземпляра вашего класса вы можете сказать timeout, который заставит ваш поток ждать этого времени

0 голосов
/ 25 июля 2011

Ну, на мой взгляд, вы могли бы использовать то, что называется "совместная многозадачность".Это искаженная вещь, и это действительно круто.Просто посмотрите на презентацию PyCon 2010 года: http://blip.tv/pycon-us-videos-2009-2010-2011/pycon-2010-cooperative-multitasking-with-twisted-getting-things-done-concurrently-11-3352182

Что ж, вам понадобится очередь транспорта, чтобы сделать это тоже ...

0 голосов
/ 22 июля 2011

Другим вариантом является использование привязок Phyton GLib , в частности его функций timeout.

Это хороший выбор, если вы не хотите использовать несколько ядер, и если зависимость от GLib не представляет проблемы. Он обрабатывает все события в одном потоке, что предотвращает проблемы синхронизации. Кроме того, его структура событий может также использоваться для наблюдения и обработки событий на основе ввода-вывода (т.е. сокетов).

UPDATE:

Вот живой сеанс с использованием GLib:

>>> import time
>>> import glib
>>> 
>>> def workon(thing):
...     print("%s: working on %s" % (time.time(), thing))
...     return True # use True for repetitive and False for one-time tasks
... 
>>> ml = glib.MainLoop()
>>> 
>>> glib.timeout_add(1000, workon, "this")
2
>>> glib.timeout_add(2000, workon, "that")
3
>>> 
>>> ml.run()
1311343177.61: working on this
1311343178.61: working on that
1311343178.61: working on this
1311343179.61: working on this
1311343180.61: working on this
1311343180.61: working on that
1311343181.61: working on this
1311343182.61: working on this
1311343182.61: working on that
1311343183.61: working on this
0 голосов
/ 21 июля 2011

Предполагая, что ваш процесс имеет цикл выполнения, который может принимать сигналы, а продолжительность каждого действия находится в пределах границ последовательной операции, использовать сигналы и сигналы тревоги posix ()

    signal.alarm(time)
If time is non-zero, this function requests that a 
SIGALRM signal be sent to the process in time seconds. 

Это зависит от того, что выИмеется в виду «», эти «более поздние» действия могут быть очень «1005 *», и если ваш процесс уже использует сигналы.Из-за формулировки вопроса неясно, зачем нужен внешний пакет python.

...