Как разрешить одновременное изменение переменных класса несколькими потоками - PullRequest
1 голос
/ 22 апреля 2020

У меня есть класс (MyClass), который содержит очередь (self.msg_queue) действий, которые необходимо выполнить, и у меня есть несколько источников ввода, которые могут добавлять задачи в очередь.

Прямо сейчас я у меня есть три функции, которые я хочу запустить одновременно:

  • MyClass.get_input_from_user ()
    • Создает окно в tkinter, в котором пользователь заполняет информацию, а когда пользователь нажимает, отправляет его, нажимает это сообщение в очередь.
  • MyClass.get_input_from_server ()
    • Проверяет сервер на наличие сообщения, читает сообщение и затем помещает его в очередь. Этот метод использует функции из родительского класса MyClass.
  • MyClass.execute_next_item_on_the_queue ()
    • Извлекает сообщение из очереди и затем воздействует на него. Это зависит от того, что является сообщением, но каждое сообщение соответствует некоторому методу в MyClass или его родительском элементе, который запускается в соответствии с большим деревом решений.

Описание процесса: После класс присоединился к сети, у меня есть три потока (по одному для каждой из вышеперечисленных функций). Каждая поточная функция добавляет элементы из очереди с синтаксисом «self.msg_queue.put (message)» и удаляет элементы из очереди с помощью «self.msg_queue.get_nowait ()».

Описание проблемы: проблема I у меня есть то, что кажется, что каждый поток изменяет свой собственный объект очереди (они не разделяют очередь, msg_queue, класса, членами которого они, функции, являются).

Я не знаком достаточно с Multiprocessing, чтобы знать, какие важные сообщения об ошибках; однако он заявляет, что он не может засолить слабый объект (он не указывает, какой объект является слабым объектом), и что в queue.put () вызывается строка «self._sem.acquire (block, timeout) приводит к «[WinError 5] Доступ запрещен» ». Можно ли предположить, что этот сбой в справочнике очереди не копируется должным образом?

[Я использую Python 3.7.2 и Процесс и очередь пакета Multiprocessing]

[I видели несколько вопросов и ответов о том, как потоки перемещают информацию между классами - создайте основной жгут, который генерирует очередь, а затем передает эту очередь в качестве аргумента каждому потоку Если бы функциям не приходилось использовать другие функции из MyClass, я мог бы увидеть адаптацию этой стратегии, когда эти функции занимают очередь и используют локальную переменную, а не переменные класса.]

[Я вполне уверен, что эта ошибка не является результатом передачи моей очереди объекту tkinter, так как мои модульные тесты показывают, как мой GUI корректно меняет очередь своего вызывающего абонента]

Ниже приведен минимальный воспроизводимый пример ошибки очереди:

from multiprocessing import Queue
from multiprocessing import Process
import queue
import time

class MyTest:
    def __init__(self):
        self.my_q = Queue()
        self.counter = 0

    def input_function_A(self):
        while True:
            self.my_q.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    def input_function_B(self):
        while True:
            self.counter = 0
            self.my_q.put(self.counter)
            time.sleep(1)

    def output_function(self):
        while True:
            try:
                var = self.my_q.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)

    def run(self):
        process_A = Process(target=self.input_function_A)
        process_B = Process(target=self.input_function_B)
        process_C = Process(target=self.output_function)

        process_A.start()
        process_B.start()
        process_C.start()

        # without this it generates the WinError: 
        # with this it still behaves as if the two input functions do not modify the queue
        process_C.join() 

if __name__ == '__main__':
    test = MyTest()
    test.run()

1 Ответ

1 голос
/ 22 апреля 2020

Действительно - это не «потоки» - это «процессы» - в то время как если бы вы использовали многопоточность, а не многопроцессорность, экземпляр self.my_q был бы тем же объектом , помещенным в тот же пространство памяти на компьютере, многопроцессорность выполняет fork процесса, и любые данные в исходном процессе (те, что выполняются в вызове "run") будут дублироваться при его использовании - поэтому каждый Подпроцесс увидит свой собственный экземпляр «Очередь», не связанный с остальными.

Правильный способ, которым различные процессы совместно используют объект мультипроцессинга. Вопрос - передать его в качестве параметра целевым методам. Таким образом, более простой способ реорганизации вашего кода таким образом:

from multiprocessing import Queue
from multiprocessing import Process
import queue
import time

class MyTest:
    def __init__(self):
        self.my_q = Queue()
        self.counter = 0

    def input_function_A(self, queue):
        while True:
            queue.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    def input_function_B(self, queue):
        while True:
            self.counter = 0
            queue.put(self.counter)
            time.sleep(1)

    def output_function(self, queue):
        while True:
            try:
                var = queue.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)

    def run(self):
        process_A = Process(target=self.input_function_A, args=(queue,))
        process_B = Process(target=self.input_function_B, args=(queue,))
        process_C = Process(target=self.output_function, args=(queue,))

        process_A.start()
        process_B.start()
        process_C.start()

        # without this it generates the WinError: 
        # with this it still behaves as if the two input functions do not modify the queue
        process_C.join() 

if __name__ == '__main__':
    test = MyTest()
    test.run()

Как вы можете видеть, поскольку ваш класс фактически не разделяет какие-либо данные через атрибуты экземпляра, этот дизайн "класса" не делает много смысла для вашего приложения - но для группировки разных работников в одном блоке кода.

Можно было бы иметь магический c -многопроцесс-класс, который имел бы некоторый внутренний метод для фактического запуска рабочих-методов и совместного использования экземпляра очереди - так что если у вас их много в Проект, было бы намного меньше шаблонного.

Что-то вместе:

from multiprocessing import Queue
from multiprocessing import Process
import time


class MPWorkerBase:
    def __init__(self, *args, **kw):
        self.queue = None
        self.is_parent_process = False
        self.is_child_process = False
        self.processes = []
        # ensure this can be used as a colaborative mixin
        super().__init__(*args, **kw)

    def run(self):
        if self.is_parent_process or self.is_child_process:
            # workers already initialized
            return

        self.queue = Queue()
        processes = []

        cls = self.__class__
        for name in dir(cls):
            method = getattr(cls, name)
            if callable(method) and getattr(method, "_MP_worker", False):
                process = Process(target=self._start_worker, args=(self.queue, name))
                self.processes.append(process)
                process.start()
        # Setting these attributes here ensure the child processes have the initial values for them.
        self.is_parent_process = True
        self.processes = processes

    def _start_worker(self, queue, method_name):

        # this method is called in a new spawned process - attribute
        # changes here no longer reflect attributes on the
        # object in the initial process

        # overwrite queue in this process with the queue object sent over the wire:
        self.queue = queue
        self.is_child_process = True
        # call the worker method
        getattr(self, method_name)()

    def __del__(self):
        for process in self.processes:
            process.join()


def worker(func):
    """decorator to mark a method as a worker that should
    run in its own subprocess
    """

    func._MP_worker = True
    return func


class MyTest(MPWorkerBase):
    def __init__(self):
        super().__init__()
        self.counter = 0

    @worker
    def input_function_A(self):
        while True:
            self.queue.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    @worker
    def input_function_B(self):
        while True:
            self.counter = 0
            self.queue.put(self.counter)
            time.sleep(1)

    @worker
    def output_function(self):
        while True:
            try:
                var = self.queue.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)


if __name__ == '__main__':
    test = MyTest()
    test.run()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...