Python multiprocessing.Queue взаимоблокировки на пут и получить - PullRequest
6 голосов
/ 26 июня 2010

У меня проблемы с взаимоблокировкой с этим фрагментом кода:


def _entropy_split_parallel(data_train, answers_train, weights):
    CPUS = 1 #multiprocessing.cpu_count()
    NUMBER_TASKS = len(data_train[0])
    processes = []

    multi_list = zip(data_train, answers_train, weights)

    task_queue = multiprocessing.Queue()
    done_queue = multiprocessing.Queue()

    for feature_index in xrange(NUMBER_TASKS):
        task_queue.put(feature_index)

    for i in xrange(CPUS):
        process = multiprocessing.Process(target=_worker, 
                args=(multi_list, task_queue, done_queue))
        processes.append(process)
        process.start()

    min_entropy = None
    best_feature = None
    best_split = None
    for i in xrange(NUMBER_TASKS):
        entropy, feature, split = done_queue.get()
        if (entropy < min_entropy or min_entropy == None) and entropy != None:
            best_feature = feature
            best_split = split

    for i in xrange(CPUS):
        task_queue.put('STOP')

    for process in processes:
        process.join()

    return best_feature, best_split


def _worker(multi_list, task_queue, done_queue):
    feature_index = task_queue.get()
    while feature_index != 'STOP':
        result = _entropy_split3(multi_list, feature_index)
        done_queue.put(result)
        feature_index = task_queue.get()

Когда я запускаю свою программу, она отлично работает в течение нескольких прогонов через _entropy_split_parallel, но в конечном итоге заходит в тупик.Родительский процесс блокируется на done_queue.get(), а рабочий процесс блокируется на done_queue.put().Поскольку в этом случае очередь всегда пуста, ожидается блокировка на get.Я не понимаю, почему работник блокирует put, поскольку очередь, очевидно, не заполнена (пусто!).Я попробовал аргументы ключевых слов block и timeout, но получил тот же результат.

Я использую многопроцессорный бэкпорт, так как я застрял в Python 2.5.


РЕДАКТИРОВАТЬ: похоже, у меня также возникают проблемы взаимоблокировки с одним из примеров, поставляемых с модулем многопроцессорной обработки.Это третий пример снизу здесь. Дедлокировка, по-видимому, возникает, только если я вызываю тестовый метод много раз.Например, изменив нижнюю часть скрипта следующим образом:


if __name__ == '__main__':
    freeze_support()
    for x in xrange(1000):
        test()

РЕДАКТИРОВАТЬ: я знаю, что это старый вопрос, но тестирование показывает, что это больше не проблема для Windows с Python 2.7.Я попробую Linux и сообщу.

Ответы [ 2 ]

4 голосов
/ 26 августа 2010

Я думаю, что проблема в том, что родительский поток присоединяется к дочернему потоку, которому он передал очередь. Это обсуждается в разделе руководства по программированию многопроцессорного модуля *.

Во всяком случае, я столкнулся с тем же симптомом, который вы описали, и когда я реорганизовал свою логику, чтобы основной поток не присоединился к дочерним потокам, не было тупиковой ситуации. Моя измененная логика включала знание количества элементов, которые я должен получить из результатов или очереди «выполнено» (что можно предсказать на основе количества дочерних потоков или количества элементов в рабочей очереди и т. Д.), И цикл бесконечно, пока все это не будет собрано.

«Игрушечная» иллюстрация логики:

num_items_expected = figure_it_out(work_queue, num_threads)
items_received = []
while len(items_received) < num_items_expected:
    items_received.append(done_queue.get())
    time.sleep(5)

Приведенная выше логика устраняет необходимость присоединения родительского потока к дочернему потоку, но позволяет блокировать родительский поток до тех пор, пока все дочерние элементы не будут выполнены. Этот подход избежал моих тупиковых проблем.

0 голосов
/ 10 июня 2011

Эта проблема исчезла с более новыми версиями Python, поэтому я предполагаю, что это была проблема с backport. В любом случае, это больше не проблема.

...