Исключение возникает в threading._wait_for_tstate_lock
, когда я передаю огромные данные между Process
и Thread
через multiprocessing.Queue
.
Мой минимальный рабочий пример сначала выглядит немного сложным - извините.Я объясню.Исходное приложение загружает много (не так важно) файлов в оперативную память.Это делается в отдельном процессе для сохранения ресурсов.Основной поток графического интерфейса не должен зависать.
- Графический интерфейс запускает отдельный
Thread
для предотвращения замораживания цикла событий графического интерфейса. Это отдельное Thread
затем запускается один Process
, который должен выполнить работу.
a) Этот Thread
создает multiprocess.Queue
(знайте, что это multiprocessing
, а не threading
!)
b) Это дает Process
для обмена данными от Process
обратно к Thread
.
Process
выполняет некоторую работу (3 шага) и .put()
результат в multiprocessing.Queue
. - Когда
Process
заканчивается, Thread
снова вступает во владение и собирает данные из Queue
, сохраняя их в своем собственном атрибуте MyThread.result
. -
Thread
говорит главному циклу / потоку графического интерфейса пользователя вызвать функцию обратного вызова, если для этого есть время. - Функция обратного вызова (
MyWindow::callback_thread_finished()' ) get the results from
MyWindow.thread.result`.
Проблема в том, что если данные, помещенные в Queue
, слишком велики, что-то происходит, я не понимаю - MyThread
никогда не заканчивается. Я должен отменить приложение через Strg + C.
Я получил несколько подсказок из документов.Но моя проблема в том, что я не полностью понял документацию.Но у меня такое ощущение, что ключ к моим проблемам можно найти там.Пожалуйста, смотрите две красные коробки в " Трубы и очереди " (Python 3.5 документы).Это полный вывод
MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
t.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
util._exit_function()
File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
_run_finalizers()
File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
finalizer()
File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
thread.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Это минимальный рабочий пример
#!/usr/bin/env python3
import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib
class MyThread (threading.Thread):
"""This thread just starts the process."""
def __init__(self, callback):
threading.Thread.__init__(self)
self._callback = callback
def run(self):
print('Running MyThread...')
self.result = []
queue = multiprocessing.Queue()
process = MyProcess(queue)
process.start()
process.join()
while not queue.empty():
process_result = queue.get()
self.result.append(process_result)
print('MyThread stoppd.')
GLib.idle_add(self._callback)
class MyProcess (multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
print('Running MyProcess...')
for i in range(3):
self.queue.put((i, 'x'*102048))
print('MyProcess stoppd.')
class MyWindow (Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self)
self.connect('destroy', Gtk.main_quit)
GLib.timeout_add(2000, self.do_start)
def do_start(self):
print('MyWindow::do_start()')
# The process need to be started from a separate thread
# to prevent the main thread (which is the gui main loop)
# from freezing while waiting for the process result.
self.thread = MyThread(self.callback_thread_finished)
self.thread.start()
def callback_thread_finished(self):
result = self.thread.result
for r in result:
print('{} {}...'.format(r[0], r[1][:10]))
if __name__ == '__main__':
win = MyWindow()
win.show_all()
Gtk.main()
Возможно дублирование, но совсем другое и IMO без ответа для моей ситуации: Thread._wait_for_tstate_lock () никогда не возвращается .
Обходной путь
Использование Manager путем изменения строки 22 до queue = multiprocessing.Manager().Queue()
решает проблему.Но я не знаю почему.Я хочу понять этот вопрос, а не только заставить мой код работать.Даже я действительно не знаю, что такое Manager()
и имеет ли оно другие (вызывающие проблемы) последствия.