Есть несколько проблем с вашим кодом.Во-первых, потоки в CPython не запускают код Python «одновременно» из-за глобальной блокировки интерпретатора ( GIL ).Поток должен содержать GIL для выполнения байт-кода Python.По умолчанию поток удерживает GIL до 5 мс (Python 3.2+), если он не отбрасывает его раньше, потому что он блокирует ввод / вывод.Для параллельного выполнения кода Python вам придется использовать multiprocessing
.
. Вам также не нужно использовать Manager.Queue
вместо queue.Queue
.Manager.Queue
- это queue.Queue
в отдельном менеджере-процессе.Вы ввели здесь объезд с IPC и копированием памяти без какой-либо выгоды.
Причина вашего тупика заключается в том, что у вас есть состояние гонки здесь:
if not workQueue.empty():
data = q.get()
Это не атомарная операция.Поток может проверить workQueue.empty()
, затем сбросить GIL, позволяя другому потоку опустошить очередь, а затем перейти к data = q.get()
, который будет блокироваться навсегда, если вы не добавите что-либо в очередь снова.Queue.empty()
проверки являются общим анти-паттерном, и его не нужно использовать.Используйте ядовитые таблетки (Sentinel-значения), чтобы вместо этого разорвать цикл get и дать работникам понять, что они должны выйти.Вам нужно столько же дозорных ценностей, сколько и работников.Узнайте больше о iter(callabel, sentinel)
здесь .
import time
from queue import Queue
from datetime import datetime
from threading import Thread, current_thread
SENTINEL = 'SENTINEL'
class myThread(Thread):
def __init__(self, func, inqueue):
super().__init__()
self.func = func
self._inqueue = inqueue
def run(self):
print(f"{datetime.now()} {current_thread().name} starting")
self.func(self._inqueue)
print(f"{datetime.now()} {current_thread().name} exiting")
def process_data(_inqueue):
for data in iter(_inqueue.get, SENTINEL):
print(f"{datetime.now()} {current_thread().name} "
f"processing {data}")
time.sleep(1)
if __name__ == '__main__':
N_WORKERS = 3
inqueue = Queue()
input_data = ["One", "Two", "Three", "Four", "Five"]
sentinels = [SENTINEL] * N_WORKERS # one sentinel value per worker
# enqueue input and sentinels
for word in input_data + sentinels:
inqueue.put(word)
threads = [myThread(process_data, inqueue) for _ in range(N_WORKERS)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"{datetime.now()} {current_thread().name} exiting")
Пример вывода:
2019-02-14 17:58:18.265208 Thread-1 starting
2019-02-14 17:58:18.265277 Thread-1 processing One
2019-02-14 17:58:18.265472 Thread-2 starting
2019-02-14 17:58:18.265542 Thread-2 processing Two
2019-02-14 17:58:18.265691 Thread-3 starting
2019-02-14 17:58:18.265793 Thread-3 processing Three
2019-02-14 17:58:19.266417 Thread-1 processing Four
2019-02-14 17:58:19.266632 Thread-2 processing Five
2019-02-14 17:58:19.266767 Thread-3 exiting
2019-02-14 17:58:20.267588 Thread-1 exiting
2019-02-14 17:58:20.267861 Thread-2 exiting
2019-02-14 17:58:20.267994 MainThread exiting
Process finished with exit code 0
Если вы не настаиваете на создании подкласса Thread
, вы также можетепросто используйте multiprocessing.pool.ThreadPool
aka multiprocessing.dummy.Pool
, который делает сантехнику для вас в фоновом режиме.