Multiprocessing.Queue с огромными данными вызывает _wait_for_tstate_lock - PullRequest
0 голосов
/ 27 мая 2019

Исключение возникает в threading._wait_for_tstate_lock, когда я передаю огромные данные между Process и Thread через multiprocessing.Queue.

Мой минимальный рабочий пример сначала выглядит немного сложным - извините.Я объясню.Исходное приложение загружает много (не так важно) файлов в оперативную память.Это делается в отдельном процессе для сохранения ресурсов.Основной поток графического интерфейса не должен зависать.

  1. Графический интерфейс запускает отдельный Thread для предотвращения замораживания цикла событий графического интерфейса.
  2. Это отдельное Threadзатем запускается один Process, который должен выполнить работу.

    a) Этот Thread создает multiprocess.Queue (знайте, что это multiprocessing, а не threading!)

    b) Это дает Process для обмена данными от Process обратно к Thread.

  3. Process выполняет некоторую работу (3 шага) и .put()результат в multiprocessing.Queue.
  4. Когда Process заканчивается, Thread снова вступает во владение и собирает данные из Queue, сохраняя их в своем собственном атрибуте MyThread.result.
  5. Thread говорит главному циклу / потоку графического интерфейса пользователя вызвать функцию обратного вызова, если для этого есть время.
  6. Функция обратного вызова (MyWindow::callback_thread_finished()' ) get the results from MyWindow.thread.result`.

Проблема в том, что если данные, помещенные в Queue, слишком велики, что-то происходит, я не понимаю - MyThread никогда не заканчивается. Я должен отменить приложение через Strg + C.

Я получил несколько подсказок из документов.Но моя проблема в том, что я не полностью понял документацию.Но у меня такое ощущение, что ключ к моим проблемам можно найти там.Пожалуйста, смотрите две красные коробки в " Трубы и очереди " (Python 3.5 документы).Это полный вывод

MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
    t.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Это минимальный рабочий пример

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib


class MyThread (threading.Thread):
    """This thread just starts the process."""
    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback

    def run(self):
        print('Running MyThread...')
        self.result = []

        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        process.join()

        while not queue.empty():
            process_result = queue.get()
            self.result.append(process_result)
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess (multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x'*102048))
        print('MyProcess stoppd.')

class MyWindow (Gtk.Window):
    def __init__(self):
        Gtk.Window.__init__(self)
        self.connect('destroy', Gtk.main_quit)
        GLib.timeout_add(2000, self.do_start)

    def do_start(self):
        print('MyWindow::do_start()')
        # The process need to be started from a separate thread
        # to prevent the main thread (which is the gui main loop)
        # from freezing while waiting for the process result.
        self.thread = MyThread(self.callback_thread_finished)
        self.thread.start()

    def callback_thread_finished(self):
        result = self.thread.result
        for r in result:
            print('{} {}...'.format(r[0], r[1][:10]))

if __name__ == '__main__':
    win = MyWindow()
    win.show_all()
    Gtk.main()

Возможно дублирование, но совсем другое и IMO без ответа для моей ситуации: Thread._wait_for_tstate_lock () никогда не возвращается .

Обходной путь

Использование Manager путем изменения строки 22 до queue = multiprocessing.Manager().Queue() решает проблему.Но я не знаю почему.Я хочу понять этот вопрос, а не только заставить мой код работать.Даже я действительно не знаю, что такое Manager() и имеет ли оно другие (вызывающие проблемы) последствия.

1 Ответ

2 голосов
/ 27 мая 2019

Согласно второму предупреждению в документации, на которую вы ссылаетесь, вы можете зайти в тупик, если присоединитесь к процессу перед обработкой всех элементов в очереди.Таким образом, запуск процесса и немедленное присоединение к нему и , а затем обработка элементов в очереди - неправильный порядок шагов.Вы должны начать процесс, затем получить элементы, и только тогда, когда все элементы будут получены, вы можете вызвать метод соединения.Определите некоторое значение часового индикатора, чтобы указать, что процесс завершил отправку данных через очередь.None например, если это не может быть обычным значением, которое вы ожидаете от процесса.

class MyThread(threading.Thread):
    """This thread just starts the process."""

    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback
        self.result = []

    def run(self):
        print('Running MyThread...')
        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        while True:
            process_result = queue.get()
            if process_result is None:
                break
            self.result.append(process_result)
        process.join()
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x' * 102048))
        self.queue.put(None)
        print('MyProcess stoppd.')
...