Каков наилучший способ повторно выполнять функцию каждые x секунд в Python? - PullRequest
206 голосов
/ 24 января 2009

Я хочу многократно выполнять функцию в Python каждые 60 секунд навсегда (точно так же, как NSTimer в Задаче C). Этот код будет работать как демон, и он по сути похож на вызов сценария python каждую минуту с использованием cron, но без необходимости его настройки пользователем.

В этом вопросе о cron, реализованном в Python , решение, по-видимому, эффективно просто sleep () в течение x секунд. Мне не нужны такие расширенные функциональные возможности, поэтому, возможно, что-то подобное будет работать

while True:
    # Code executed here
    time.sleep(60)

Есть ли возможные проблемы с этим кодом?

Ответы [ 15 ]

185 голосов
/ 24 января 2009

Используйте модуль sched , который реализует планировщик событий общего назначения.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print "Doing stuff..."
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()
137 голосов
/ 12 августа 2014

Просто заблокируйте вашу петлю времени на системные часы. Легко.

import time
starttime=time.time()
while True:
  print "tick"
  time.sleep(60.0 - ((time.time() - starttime) % 60.0))
61 голосов
/ 24 января 2009

Возможно, вы захотите рассмотреть Twisted , которая представляет собой сетевую библиотеку Python, которая реализует Reactor Pattern .

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

Хотя «while True: sleep (60)», вероятно, будет работать, Twisted, вероятно, уже реализует многие функции, которые вам в конечном итоге понадобятся (демонизация, ведение журнала или обработка исключений, как указано в bobince) и, вероятно, будет более надежным решением.

45 голосов
/ 12 июля 2016

Если вы хотите, чтобы неблокирующий способ выполнял вашу функцию периодически, вместо блокирующего бесконечного цикла, я бы использовал потоковый таймер. Таким образом, ваш код может продолжать работать и выполнять другие задачи, при этом ваша функция вызывается каждые n секунд. Я часто использую эту технику для распечатки информации о ходе выполнения длинных задач, интенсивно использующих CPU / Disk / Network.

Вот код, который я разместил в похожем вопросе с элементами управления start () и stop ():

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Использование:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Особенности:

  • Только стандартная библиотека, без внешних зависимостей
  • start() и stop() безопасно вызывать несколько раз, даже если таймер уже запущен / остановлен
  • вызываемая функция может иметь позиционные и именованные аргументы
  • Вы можете изменить interval в любое время, оно вступит в силу после следующего запуска. То же самое для args, kwargs и даже function!
32 голосов
/ 04 ноября 2012

Более простой способ, которым я считаю:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

Таким образом, ваш код выполняется, затем он ждет 60 секунд, затем он выполняется снова, ждет, выполняет и т. Д ... Не надо усложнять вещи: D

15 голосов
/ 05 декабря 2016

Вот обновление кода от MestreLion, которое позволяет избежать дрейфа с течением времени:

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False
10 голосов
/ 12 апреля 2018
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

Если вы хотите сделать это без блокировки оставшегося кода, вы можете использовать его, чтобы запустить в своем собственном потоке:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

Это решение сочетает в себе несколько функций, редко встречающихся в других решениях:

  • Обработка исключений: Насколько это возможно на этом уровне, исключения обрабатываются правильно, i. е. войти в систему для целей отладки, не прерывая нашу программу.
  • Нет цепочки: Общая цепеподобная реализация (для планирования следующего события), которую вы найдете во многих ответах, является хрупкой в ​​том аспекте, что если что-то идет не так в механизме планирования (threading.Timer или чем-то еще ), это прервет цепочку. Тогда дальнейших казней не будет, даже если причина проблемы уже устранена. Простой цикл и ожидание с простым sleep() гораздо надежнее по сравнению.
  • Без смещения: Мое решение точно отслеживает время, в которое оно должно работать. В зависимости от времени выполнения дрейф отсутствует (как и во многих других решениях).
  • Пропуск: Мое решение пропустит задачи, если одно выполнение заняло слишком много времени (например, делать X каждые пять секунд, но X занимало 6 секунд). Это стандартное поведение cron (и не без причины). Многие другие решения затем просто выполняют задачу несколько раз подряд без какой-либо задержки. В большинстве случаев (например, задачи по очистке) это нежелательно. Если это - , просто используйте вместо него next_time += delay.
5 голосов
/ 06 июня 2014

Я столкнулся с подобной проблемой некоторое время назад. Может быть http://cronus.readthedocs.org может помочь?

Для v0.2 работает следующий фрагмент

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec
4 голосов
/ 24 января 2009

Основное различие между этим и cron заключается в том, что исключение убьет демона навсегда. Возможно, вы захотите заключить в ловушку исключений и регистратор.

1 голос
/ 10 сентября 2018

Вот адаптированная версия к коду от MestreLion. В дополнение к исходной функции этот код:

1) добавить first_interval, используемый для срабатывания таймера в определенное время (вызывающий должен вычислить first_interval и передать его)

2) решить условие гонки в исходном коде. В исходном коде, если поток управления не смог отменить запущенный таймер («Остановите таймер и отмените выполнение действия таймера. Это будет работать только в том случае, если таймер все еще находится в стадии ожидания».) 1006 * таймер будет работать бесконечно.

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...