Как раскошелиться и объединить несколько подпроцессов с глобальным тайм-аутом в Python? - PullRequest
2 голосов
/ 12 июня 2019

Я хочу выполнить несколько задач параллельно в нескольких подпроцессах и установить тайм-аут, если задачи не были выполнены в течение некоторой задержки.

Первый подход заключается в разветвлении и объединении подпроцессов по отдельности с оставшимися тайм-аутами, рассчитанными относительно глобального тайм-аута, как предложено в в этом ответе . У меня отлично работает.

Второй подход, который я хочу использовать здесь, состоит в создании пула подпроцессов и ожидании с глобальным таймаутом, как предложено в этом ответе .

Однако у меня есть проблема со вторым подходом: после заполнения пула подпроцессов задачами, имеющими multiprocessing.Event() объекты, ожидание их завершения вызывает это исключение:

RuntimeError: объекты условий должны быть общими только для процессов через наследование

Вот фрагмент кода Python:

import multiprocessing.pool
import time


class Worker:

    def __init__(self):
        self.event = multiprocessing.Event()  # commenting this removes the RuntimeError

    def work(self, x):
        time.sleep(1)
        return x * 10


if __name__ == "__main__":
    pool_size = 2
    timeout = 5

    with multiprocessing.pool.Pool(pool_size) as pool:
        result = pool.map_async(Worker().work, [4, 5, 2, 7])
        print(result.get(timeout))  # raises the RuntimeError

1 Ответ

0 голосов
/ 12 июня 2019

В разделе "Руководства по программированию" документации multiprocessing - параллелизм на основе процесса есть следующий абзац:

Лучшенаследовать, чем pickle / unpickle

При использовании методов запуска spawn или forkserver многие типы из multiprocessing должны быть доступны для выбора, чтобы их могли использовать дочерние процессы.Однако, как правило, следует избегать отправки общих объектов другим процессам с использованием каналов или очередей.Вместо этого вы должны расположить программу так, чтобы процесс, которому необходим доступ к общему ресурсу, созданному в другом месте, мог унаследовать его от процесса-предка.

Таким образом, multiprocessing.Event() вызвал RuntimeError, потому что его нельзя выбратьКак показано в следующем фрагменте кода Python:

import multiprocessing
import pickle

pickle.dumps(multiprocessing.Event())

, который вызывает то же исключение:

RuntimeError: объекты условий должны совместно использоваться только между процессами посредством наследования

Решение заключается в использовании прокси-объекта :

Прокси-объект - это объект, который ссылается на общий объект, который (предположительно) живет в другом процессе.

потому что:

Важной особенностью прокси-объектов является возможность их выбора, чтобы их можно было передавать между процессами.

multiprocessing.Manager().Event() создает общий объект threading.Event() и возвращает для него прокси-сервер, поэтому заменив эту строку:

self.event = multiprocessing.Event()

следующей строкой во фрагменте кода Python для решения вопросапроблема:

self.event = multiprocessing.Manager().Event()
...