Быстрое планирование вызова метода в Python - PullRequest
3 голосов
/ 07 июня 2010

Для какой-то части моего проекта мне нужна система локального планирования процессов, которая позволит мне отложить выполнение метода на несколько секунд. У меня есть тысячи «клиентов» этой системы, поэтому использование threading.Timer для каждой задержки - плохая идея, потому что я быстро достигну предела потока ОС. Я реализовал систему, которая использует только один поток для контроля времени.

Основная идея состоит в том, чтобы сохранить отсортированную очередь задач (time + func + args + kwargs) и использовать одну threading.Timer для планирования / отмены выполнения заголовка этой очереди. Эта схема работает, но я не доволен производительностью. ~ 2000 клиентов, которые планируют фиктивные задачи каждые ~ 10 секунд, заставляют процесс занимать 40% процессорного времени. Глядя на вывод профилировщика, я вижу, что все время тратится на создание новой threading.Timer s, ее запуск и особенно на создание новых потоков.

Я считаю, что есть лучший способ. Теперь я думаю о переписывании LightTimer, чтобы был один поток выполнения, управляемый threading.Event, и несколько потоков синхронизации, которые будут set() событие. Например:

  • Я планирую задачу для вызова через 10 секунд. Задача добавлена ​​в очередь. Сроки # 1 начинается time.sleep(10) до event.set()
  • Затем я планирую задачу для вызова через 11 секунд. Задача добавлена ​​в очередь. Ничего не происходит с потоком синхронизации, он обнаружит новую задачу после пробуждения.
  • Затем я планирую задачу для вызова через 5 секунд. Задача добавляется в очередь. Временной поток № 2 начинается time.sleep(5), потому что # 1 уже спит в течение более длительного интервала.

Надеюсь, вы уловили эту идею. Что вы думаете об этом пути? Есть ли способ лучше? Может быть, я могу использовать некоторые функции системы Linux, чтобы найти оптимальное решение?

Ответы [ 3 ]

2 голосов
/ 07 июня 2010

Вы смотрели на модуль sched в стандартной библиотеке Python? Запуск планировщика в выделенном потоке (и наличие всех запланированных действий: «поместите связанный метод и его аргументы в очередь», из которого потоки в пуле очищают и выполняют его - так же, как я писал в главе «О ореховой скорлупе» о потоках, за исключением того, что в этом случае не было планирования) следует делать то, что вы хотите.

2 голосов
/ 07 июня 2010

Альтернативная реализация, которую вы можете использовать, - это использование метода time.time() для вычисления абсолютного времени выполнения каждой функции в очереди. Поместите это время и вашу функцию для вызова в обертку объекта, которая переопределяет оператор сравнения, используя время выполнения для определения порядка. Затем используйте модуль heapq для поддержания минимальной кучи. Это обеспечит вам эффективную структуру данных, в которой элемент 0 кучи всегда является вашим следующим событием.

Одним из способов реализации реальных вызовов было бы использование отдельного потока для выполнения обратных вызовов. Куча должна быть защищена мьютексом, и вы можете использовать условную переменную для реализации планирования. В бесконечном цикле просто выполните поиск в следующий раз, чтобы выполнить функцию (элемент 0 кучи) и использовать метод условной переменной wait() с таймаутом, установленным на следующее время выполнения. Ваш метод вставки в кучу может затем использовать метод условной переменной notify(), чтобы разбудить поток планирования раньше, если вновь вставленная функция должна появиться раньше, чем самая ранняя из них, уже в куче.

0 голосов
/ 08 июня 2010

Вы вряд ли достигнете лимита потока ОС с "несколькими тысячами клиентов"; Вы можете использовать много ненужной памяти со стеками для всех этих потоков.

Посмотрите, что делает витая, она позволяет процессу мультиплексировать множество событий (включая таймеры) таким образом, который, как оказалось, довольно хорошо работает с большим количеством событий.

Вы также можете комбинировать управляемые событиями и многопроцессорные модели, запустив несколько процессов на машину и выполнив управляемую событиями логику в каждом из них - скажем, один процесс может обрабатывать 2000 клиентов, вы все равно можете запустить 30-кратные процессы (при условии, что есть достаточный общий ресурс) и получить лучшую пропускную способность, особенно на современном многоядерном оборудовании.

...