У меня есть класс (MyClass), который содержит очередь (self.msg_queue) действий, которые необходимо выполнить, и у меня есть несколько источников ввода, которые могут добавлять задачи в очередь.
Прямо сейчас я у меня есть три функции, которые я хочу запустить одновременно:
- MyClass.get_input_from_user ()
- Создает окно в tkinter, в котором пользователь заполняет информацию, а когда пользователь нажимает, отправляет его, нажимает это сообщение в очередь.
- MyClass.get_input_from_server ()
- Проверяет сервер на наличие сообщения, читает сообщение и затем помещает его в очередь. Этот метод использует функции из родительского класса MyClass.
- MyClass.execute_next_item_on_the_queue ()
- Извлекает сообщение из очереди и затем воздействует на него. Это зависит от того, что является сообщением, но каждое сообщение соответствует некоторому методу в MyClass или его родительском элементе, который запускается в соответствии с большим деревом решений.
Описание процесса: После класс присоединился к сети, у меня есть три потока (по одному для каждой из вышеперечисленных функций). Каждая поточная функция добавляет элементы из очереди с синтаксисом «self.msg_queue.put (message)» и удаляет элементы из очереди с помощью «self.msg_queue.get_nowait ()».
Описание проблемы: проблема I у меня есть то, что кажется, что каждый поток изменяет свой собственный объект очереди (они не разделяют очередь, msg_queue, класса, членами которого они, функции, являются).
Я не знаком достаточно с Multiprocessing, чтобы знать, какие важные сообщения об ошибках; однако он заявляет, что он не может засолить слабый объект (он не указывает, какой объект является слабым объектом), и что в queue.put () вызывается строка «self._sem.acquire (block, timeout) приводит к «[WinError 5] Доступ запрещен» ». Можно ли предположить, что этот сбой в справочнике очереди не копируется должным образом?
[Я использую Python 3.7.2 и Процесс и очередь пакета Multiprocessing]
[I видели несколько вопросов и ответов о том, как потоки перемещают информацию между классами - создайте основной жгут, который генерирует очередь, а затем передает эту очередь в качестве аргумента каждому потоку Если бы функциям не приходилось использовать другие функции из MyClass, я мог бы увидеть адаптацию этой стратегии, когда эти функции занимают очередь и используют локальную переменную, а не переменные класса.]
[Я вполне уверен, что эта ошибка не является результатом передачи моей очереди объекту tkinter, так как мои модульные тесты показывают, как мой GUI корректно меняет очередь своего вызывающего абонента]
Ниже приведен минимальный воспроизводимый пример ошибки очереди:
from multiprocessing import Queue
from multiprocessing import Process
import queue
import time
class MyTest:
def __init__(self):
self.my_q = Queue()
self.counter = 0
def input_function_A(self):
while True:
self.my_q.put(self.counter)
self.counter = self.counter + 1
time.sleep(0.2)
def input_function_B(self):
while True:
self.counter = 0
self.my_q.put(self.counter)
time.sleep(1)
def output_function(self):
while True:
try:
var = self.my_q.get_nowait()
except queue.Empty:
var = -1
except:
break
print(var)
time.sleep(1)
def run(self):
process_A = Process(target=self.input_function_A)
process_B = Process(target=self.input_function_B)
process_C = Process(target=self.output_function)
process_A.start()
process_B.start()
process_C.start()
# without this it generates the WinError:
# with this it still behaves as if the two input functions do not modify the queue
process_C.join()
if __name__ == '__main__':
test = MyTest()
test.run()