Многопоточность зависает при использовании `thread.join ()` - PullRequest
0 голосов
/ 14 февраля 2019

Я пытаюсь настроить 3 потока и выполнить 5 задач в очереди.Идея состоит в том, что потоки сначала будут запускать первые 3 задачи одновременно, затем 2 потока завершают оставшиеся 2. Но программа, кажется, зависает.Я не мог обнаружить ничего плохого в этом.

from multiprocessing import Manager
import threading
import time
global exitFlag 
exitFlag = 0


class myThread(threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q

    def run(self):
        print("Starting " + self.name)
        process_data(self.name, self.q)
        print("Exiting " + self.name)


def process_data(threadName, q):
    global exitFlag
    while not exitFlag:
        if not workQueue.empty():
            data = q.get()
            print("%s processing %s" % (threadName, data))
        else:
            pass
        time.sleep(1)
    print('Nothing to Process')


threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Manager().Queue(10)
threads = []
threadID = 1

# create thread
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# fill up queue
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# wait queue clear
while not workQueue.empty():
    pass
# notify thread exit
exitFlag = 1
# wait for all threads to finish
for t in threads:
    t.join()
print("Exiting Main Thread")

Я не знаю, что именно произошло, но после удаления части join() программа может работать просто весело.Что я не понимаю, так это то, что exitFlag должен был посылать сигнал, когда очередь очищается.Так что кажется, что-то сигнал не был обнаружен process_data ()

1 Ответ

0 голосов
/ 14 февраля 2019

Есть несколько проблем с вашим кодом.Во-первых, потоки в CPython не запускают код Python «одновременно» из-за глобальной блокировки интерпретатора ( GIL ).Поток должен содержать GIL для выполнения байт-кода Python.По умолчанию поток удерживает GIL до 5 мс (Python 3.2+), если он не отбрасывает его раньше, потому что он блокирует ввод / вывод.Для параллельного выполнения кода Python вам придется использовать multiprocessing.

. Вам также не нужно использовать Manager.Queue вместо queue.Queue.Manager.Queue - это queue.Queue в отдельном менеджере-процессе.Вы ввели здесь объезд с IPC и копированием памяти без какой-либо выгоды.

Причина вашего тупика заключается в том, что у вас есть состояние гонки здесь:

    if not workQueue.empty():
        data = q.get()

Это не атомарная операция.Поток может проверить workQueue.empty(), затем сбросить GIL, позволяя другому потоку опустошить очередь, а затем перейти к data = q.get(), который будет блокироваться навсегда, если вы не добавите что-либо в очередь снова.Queue.empty() проверки являются общим анти-паттерном, и его не нужно использовать.Используйте ядовитые таблетки (Sentinel-значения), чтобы вместо этого разорвать цикл get и дать работникам понять, что они должны выйти.Вам нужно столько же дозорных ценностей, сколько и работников.Узнайте больше о iter(callabel, sentinel) здесь .

import time
from queue import Queue
from datetime import datetime
from threading import Thread, current_thread


SENTINEL = 'SENTINEL'


class myThread(Thread):

    def __init__(self, func, inqueue):
        super().__init__()
        self.func = func
        self._inqueue = inqueue

    def run(self):
        print(f"{datetime.now()} {current_thread().name} starting")
        self.func(self._inqueue)
        print(f"{datetime.now()} {current_thread().name} exiting")


def process_data(_inqueue):
    for data in iter(_inqueue.get, SENTINEL):
        print(f"{datetime.now()} {current_thread().name} "
              f"processing {data}")
        time.sleep(1)


if __name__ == '__main__':


    N_WORKERS = 3

    inqueue = Queue()
    input_data = ["One", "Two", "Three", "Four", "Five"]

    sentinels = [SENTINEL] * N_WORKERS # one sentinel value per worker
    # enqueue input and sentinels
    for word in input_data +  sentinels:
        inqueue.put(word)

    threads = [myThread(process_data, inqueue) for _ in range(N_WORKERS)]

    for t in threads:
        t.start()
    for t in threads:
        t.join()

    print(f"{datetime.now()} {current_thread().name} exiting")

Пример вывода:

2019-02-14 17:58:18.265208 Thread-1 starting
2019-02-14 17:58:18.265277 Thread-1 processing One
2019-02-14 17:58:18.265472 Thread-2 starting
2019-02-14 17:58:18.265542 Thread-2 processing Two
2019-02-14 17:58:18.265691 Thread-3 starting
2019-02-14 17:58:18.265793 Thread-3 processing Three
2019-02-14 17:58:19.266417 Thread-1 processing Four
2019-02-14 17:58:19.266632 Thread-2 processing Five
2019-02-14 17:58:19.266767 Thread-3 exiting
2019-02-14 17:58:20.267588 Thread-1 exiting
2019-02-14 17:58:20.267861 Thread-2 exiting
2019-02-14 17:58:20.267994 MainThread exiting

Process finished with exit code 0

Если вы не настаиваете на создании подкласса Thread, вы также можетепросто используйте multiprocessing.pool.ThreadPool aka multiprocessing.dummy.Pool, который делает сантехнику для вас в фоновом режиме.

...