Учитывая количество потоков, я хочу ограничить частоту обращений к рабочей функции скоростью, скажем, один в секунду.
Моя идея состояла в том, чтобы отслеживать последний раз, когда был сделан вызов во всехтемы и сравнить это с текущим временем в каждой теме.Тогда если current_time - last_time < rate
.Я позволил нити немного поспать.Что-то не так с моей реализацией - я предполагаю, что, возможно, неправильно понял, как работают блокировки.
Мой код:
from Queue import Queue
from threading import Thread, Lock, RLock
import time
num_worker_threads = 2
rate = 1
q = Queue()
lock = Lock()
last_time = [time.time()]
def do_work(i, idx):
# Do work here, print is just a dummy.
print('Thread: {0}, Item: {1}, Time: {2}'.format(i, idx, time.time()))
def worker(i):
while True:
lock.acquire()
current_time = time.time()
interval = current_time - last_time[0]
last_time[0] = current_time
if interval < rate:
time.sleep(rate - interval)
lock.release()
item = q.get()
do_work(i, item)
q.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker, args=[i])
t.daemon = True
t.start()
for item in xrange(10):
q.put(item)
q.join()
Я ожидал увидеть один вызов в секунду do_work
, однако, я получаю в основном 2 звонка одновременно (по 1 для каждого потока) с последующей паузой в одну секунду.Что случилось?
Хорошо, некоторые редактируют.Совет просто снизить скорость, с которой предметы помещаются в очередь, был хорошим, однако я вспомнил, что мне нужно было позаботиться о случае, когда работники повторно добавляли предметы в очередь.Канонический пример: разбиение на страницы или откат-повтор в сетевых задачах.Я придумал следующее.Я предполагаю, что для реальных сетевых задач библиотеки eventlet / gevent могут быть проще на ресурсах, но это только пример.Он в основном использует приоритетную очередь для накапливания запросов и использует дополнительный поток для перемещения элементов из кучи в реальную очередь задач с равномерной скоростью.Я смоделировал повторную вставку в кучу работниками, затем заново обработанные предметы обрабатываются.
import sys
import os
import time
import random
from Queue import Queue, PriorityQueue
from threading import Thread
rate = 0.1
def worker(q, q_pile, idx):
while True:
item = q.get()
print("Thread: {0} processed: {1}".format(item[1], idx))
if random.random() > 0.3:
print("Thread: {1} reinserting item: {0}".format(item[1], idx))
q_pile.put((-1 * time.time(), item[1]))
q.task_done()
def schedule(q_pile, q):
while True:
if not q_pile.empty():
print("Items on pile: {0}".format(q_pile.qsize()))
q.put(q_pile.get())
q_pile.task_done()
time.sleep(rate)
def main():
q_pile = PriorityQueue()
q = Queue()
for i in range(5):
t = Thread(target=worker, args=[q, q_pile, i])
t.daemon = True
t.start()
t_schedule = Thread(target=schedule, args=[q_pile, q])
t_schedule.daemon = True
t_schedule.start()
[q_pile.put((-1 * time.time(), i)) for i in range(10)]
q_pile.join()
q.join()
if __name__ == '__main__':
main()