У этого ответа на самом деле есть два предложения - мое первое и другое, которое я обнаружил после первого.
sched
Я подозреваю, что вы ищете модуль sched
.
РЕДАКТИРОВАТЬ : мое голое предложение казалось мало полезным после того, как я его прочитал.Поэтому я решил протестировать модуль sched
, чтобы увидеть, может ли он работать, как я предложил.Вот мой тест: я бы использовал его с единственным потоком, более или менее таким образом:
class SchedulingThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.scheduler = sched.scheduler(time.time, time.sleep)
self.queue = []
self.queue_lock = threading.Lock()
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
def run(self):
self.scheduler.run()
def schedule(self, function, delay):
with self.queue_lock:
self.queue.append((delay, 1, function, ()))
def _schedule_in_scheduler(self):
with self.queue_lock:
for event in self.queue:
self.scheduler.enter(*event)
print "Registerd event", event
self.queue = []
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
Во-первых, я бы создал класс потока, который имел бы свой собственный планировщик и очередь.Как минимум одно событие будет зарегистрировано в планировщике: одно для вызова метода для планирования событий из очереди.
class SchedulingThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.scheduler = sched.scheduler(time.time, time.sleep)
self.queue = []
self.queue_lock = threading.Lock()
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
Метод для планирования событий из очереди будет блокировать очередь, планировать каждое событие, пустойочередь и расписание снова, чтобы искать новые события когда-нибудь в будущем.Обратите внимание, что период поиска новых событий короткий (одна секунда), вы можете изменить его:
def _schedule_in_scheduler(self):
with self.queue_lock:
for event in self.queue:
self.scheduler.enter(*event)
print "Registerd event", event
self.queue = []
self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())
В классе также должен быть метод для планирования пользовательских событий.Естественно, этот метод должен блокировать очередь при ее обновлении:
def schedule(self, function, delay):
with self.queue_lock:
self.queue.append((delay, 1, function, ()))
Наконец, класс должен вызвать основной метод планировщика:
def run(self):
self.scheduler.run()
Вот пример использования:
def print_time():
print "scheduled:", time.time()
if __name__ == "__main__":
st = SchedulingThread()
st.start()
st.schedule(print_time, 10)
while True:
print "main thread:", time.time()
time.sleep(5)
st.join()
Его вывод на моей машине:
$ python schedthread.py
main thread: 1311089765.77
Registerd event (10, 1, <function print_time at 0x2f4bb0>, ())
main thread: 1311089770.77
main thread: 1311089775.77
scheduled: 1311089776.77
main thread: 1311089780.77
main thread: 1311089785.77
Этот код - просто быстрый и грязный пример, может потребоваться некоторая работа.Тем не менее, я должен признаться, что я немного очарован модулем sched
, поэтому я предложил это.Возможно, вы захотите найти и другие предложения:)
APScheduler
Глядя в Google на решения, подобные тому, что я опубликовал, я нашел этот потрясающий модуль APScheduler ,Это настолько практично и полезно, что могу поспорить, что - ваше решение.Мой предыдущий пример был бы намного проще с этим модулем:
from apscheduler.scheduler import Scheduler
import time
sch = Scheduler()
sch.start()
@sch.interval_schedule(seconds=10)
def print_time():
print "scheduled:", time.time()
sch.unschedule_func(print_time)
while True:
print "main thread:", time.time()
time.sleep(5)
(К сожалению, я не нашел, как запланировать выполнение события только один раз, поэтому событие функции должно быть отменено по расписанию. Бьюсь об заклад, это можно решитьс каким-то декоратором.)