Как исправить 'TypeError: невозможно выбрать _thread.lock объекты' при передаче очереди в поток в дочернем процессе - PullRequest
0 голосов
/ 22 мая 2019

Я застрял в этом вопросе весь день, и я не смог найти каких-либо решений, касающихся того, что я пытаюсь выполнить.

Я пытаюсь передать очереди потокам, порожденным в подпрограмме.-процессов.Очереди были созданы во входном файле и переданы каждому подпроцессу в качестве параметра.

Я делаю модульную программу для: а) запуска нейронной сети б) автоматического обновления моделей сети при необходимости в) регистрациисобытия / изображения из нейронной сети на серверы.Моя бывшая программа боготворила только одно ядро ​​ЦП, работавшее с несколькими потоками, и работала довольно медленно, поэтому я решил, что мне нужно выполнить дополнительную обработку некоторых частей программы, чтобы они могли работать в своих собственных пространствах памяти с максимальной отдачей.

Подпроцесс:

  1. Связь клиент-сервер
  2. Управление веб-камерой и обработка изображений
  3. Создание интерфейса для нейронных сетей (есть 2 нейронные сети со своими собственнымиобрабатывать каждый)

4 всего подпроцесса.

По мере разработки мне нужно общаться через каждый процесс, чтобы они все были на одной странице с событиями с серверов и еще много чего.,Насколько я могу судить, наилучшим вариантом будет Queue.

(Уточнение: 'Queue' из модуля 'multiprocessing', НЕ модуль 'queue')

~~ Однако ~~

Каждый из этих подпроцессов порождает свои собственные потоки.Например, 1-й подпроцесс будет порождать несколько потоков: один поток в очереди для прослушивания событий с разных серверов и передачи их в разные области программы;один поток для прослушивания очереди, получающей изображения из одной из нейронных сетей;один поток для прослушивания очереди, получающей живые изображения с веб-камеры;и один поток для прослушивания очереди, получающей выходные данные из другой нейронной сети.

Я могу без проблем передавать очереди подпроцессам и эффективно их использовать.Однако, когда я пытаюсь передать их потокам внутри каждого подпроцесса, я получаю вышеуказанную ошибку.

Я довольно новичок в многопроцессорности;однако методология, лежащая в его основе, выглядит примерно такой же, как потоки, за исключением пространства совместно используемой памяти и GIL.

Это из Main.py;вход в программу.

from lib.client import Client, Image

from multiprocessing import Queue, Process

class Main():

    def __init__(self, server):

        self.KILLQ = Queue()
        self.CAMERAQ = Queue()

        self.CLIENT = Client((server, 2005), self.KILLQ, self.CAMERAQ)
        self.CLIENT_PROCESS = Process(target=self.CLIENT.do, daemon=True)

        self.CLIENT_PROCESS.start()

if __name__ == '__main__':
    m = Main('127.0.0.1')
    while True:
        m.KILLQ.put("Hello world")

И это из client.py (в папке с именем lib)

class Client():

    def __init__(self, connection, killq, cameraq):

        self.TCP_IP = connection[0]
        self.TCP_PORT = connection[1]

        self.CAMERAQ = cameraq
        self.KILLQ = killq

        self.BUFFERSIZE = 1024
        self.HOSTNAME = socket.gethostname()

        self.ATTEMPTS = 0

        self.SHUTDOWN = False

        self.START_CONNECTION = MakeConnection((self.TCP_IP, self.TCP_PORT))

        # self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,), daemon=True)

        # self.KILLQ_THREAD.start()

    def do(self):
        # The function ran as the subprocess from Main.py
        print(self.KILLQ.get())

    def _listen(self, q):
        # This is threaded multiple times listening to each Queue (as 'q' that is passed when the thread is created)
        while True:
            print(self.q.get())

# self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,), daemon=True)

Здесь выдается ошибка.Если я оставлю эту строку с комментариями, программа будет работать нормально.Я могу читать из очереди в этом подпроцессе без проблем (т. Е. Функция 'do'), не находясь в потоке этого подпроцесса (т. Е. Функция '_listen').

Мне нужно уметьвзаимодействовать через каждый процесс, чтобы они могли идти в ногу с основной программой (т. е. в случае обновления модели нейронной сети, подпроцесс вывода должен завершиться, чтобы модель могла обновляться без ошибок).

Любая помощь с этим была бы великолепна!

Я также очень открыт для других методов общения, которые также будут работать.В случае, если вы считаете, что лучший процесс коммуникации будет работать;он должен быть достаточно быстрым, чтобы поддерживать потоковую передачу изображений 4k, отправляемых на сервер с камеры в режиме реального времени.

Большое спасибо за потраченное время!:)

1 Ответ

1 голос
/ 22 мая 2019

Очередь не проблема. Те из пакета multiprocessing предназначены для выбора, чтобы их можно было разделить между процессами.

Проблема в том, что ваш поток KILLQ_THREAD создан в основном процессе. Потоки не должны быть разделены между процессами. Фактически, когда процесс разветвляется в соответствии со стандартами POSIX, потоки, которые активны в родительском процессе, являются , а не частью образа процесса, который клонируется в пространство памяти нового дочернего элемента. Одна из причин заключается в том, что состояние мьютексов во время вызова fork() может привести к тупикам в дочернем процессе.

Вам придется перенести создание вашего потока в ваш дочерний процесс, т.е.

def do(self):
    self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,), daemon=True)
    self.KILLQ_THREAD.start()

Предположительно, KILLQ должен сигнализировать о завершении дочерних процессов. В этом случае, особенно если вы планируете использовать более одного дочернего процесса, очередь не лучший способ для достижения этой цели. Поскольку Queue.get() и Queue.get_nowait() удаляют элемент из очереди, каждый элемент может быть извлечен и обработан только одним потребителем. Ваш продюсер должен будет поставить несколько сигналов выключения в очередь. В сценарии с несколькими потребителями у вас также нет разумного способа гарантировать, что конкретный потребитель получит какой-либо конкретный товар. Любой элемент, помещенный в очередь, потенциально может быть извлечен любым из читающих из него потребителей.

Для сигнализации, особенно с несколькими получателями, лучше использовать Event

Вы также заметите, что ваша программа быстро зависает после запуска. Это потому, что вы запускаете и ваш дочерний процесс, и поток с daemon=True.

Когда ваш Client.do() метод выглядит так, как описано выше, то есть создает и запускает поток, а затем завершает работу, ваш дочерний процесс завершается сразу после вызова self.KILLQ_THREAD.start(), и демонический поток сразу же завершает его. Ваш основной процесс ничего не замечает и продолжает помещать Hello world в очередь до тех пор, пока он в итоге не заполнится и queue.Full не поднимется.

Вот пример сжатого кода, использующего Event для сигнализации отключения в двух дочерних процессах с одним потоком каждый.

main.py

import time    
from lib.client import Client
from multiprocessing import Process, Event

class Main:

    def __init__(self):
        self.KILLQ = Event()
        self._clients = (Client(self.KILLQ), Client(self.KILLQ))
        self._procs = [Process(target=cl.do, daemon=True) for cl in self._clients]
        [proc.start() for proc in self._procs]

if __name__ == '__main__':
    m = Main()
    # do sth. else
    time.sleep(1)
    # signal for shutdown
    m.KILLQ.set()
    # grace period for both shutdown prints to show
    time.sleep(.1)

client.py

import multiprocessing
from threading import Thread

class Client:

    def __init__(self, killq):
        self.KILLQ = killq

    def do(self):
        # non-daemonic thread! We want the process to stick around until the thread 
        # terminates on the signal set by the main process
        self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,))
        self.KILLQ_THREAD.start()

    @staticmethod
    def _listen(q):
        while not q.is_set():
            print("in thread {}".format(multiprocessing.current_process().name))
        print("{} - master signalled shutdown".format(multiprocessing.current_process().name))

выход

[...]
in thread Process-2
in thread Process-1
in thread Process-2
Process-2 - master signalled shutdown
in thread Process-1
Process-1 - master signalled shutdown

Process finished with exit code 0

Что касается методов межпроцессного взаимодействия, возможно, вы захотите взглянуть на решение для потокового сервера. Мигель Гринберг написал отличный учебник по Потоковое видео с Flask еще в 2014 году, с более поздним продолжением с августа 2017 года .

...