Обновление : может быть (не уверен в этом на данный момент) решение, упомянутое в конце.
Я изо всех сил пытался выяснить, почему исключение, вызванное у Pool
рабочих, не распространялось, т. Е. Оно продолжалось молча. Я добавил довольно много исключений в свой код, чтобы определить, где он может быть в конечном итоге пойман, и обнаружил странное поведение.
У меня есть такой класс:
class Pipeline:
def __call__(self, **kwargs):
raise ValueError()
for module in self.modules:
print("Pipeline: call module: %s" % str(module))
# raise BaseException()
raise ValueError()
o = module(**kwargs)
kwargs.update(o)
return o, kwargs
какие экземпляры вызываются в Pool
(на самом деле pathos.ProcessingPool
не multiprocessing.Pool
не уверен, что это может измениться в этом контексте), т.е.
from pathos.multiprocessing import ProcessingPool as Pool
with Pool(processes=n_thread) as pool:
pool.map(run_exp_args, [...])
def run_exp_args([...]):
[...]
p = Pipeline(...)
o, k = p(**kwargs)
Как и следовало ожидать, это вызывает ValueError
перед входом в цикл, в Pipeline.__call__
(что подтверждается номером строки трассы), он останавливает программу и показывает соответствующую трассировку.
Что странно, так это то, что, если я прокомментирую это первое ValueError
(но оставлю его в цикле), это исключение никогда не будет распространяться, оно просто игнорируется.
Теперь, если я вместо этого подниму BaseException
, раскомментировав строку выше, это исключение будет поднято, я вижу след , но , вся программа не останавливается.
Я экспериментировал с n_thread=1
, чтобы убедиться, что мой вывод актуален.
print
до raise
имеет значение ... (и удаление его в конечном итоге решает проблему)
если я удаляю печать в цикле, отображается ValueError
и программа останавливается.
Так что в основном:
class Pipeline:
def __init__(self, *args):
self.modules = [*args]
def __call__(self, **kwargs):
for module in self.modules:
print()
raise ValueError()
o = module(**kwargs)
kwargs.update(o)
return o, kwargs
работает без ошибок, когда
class Pipeline:
def __init__(self, *args):
self.modules = [*args]
def __call__(self, **kwargs):
for module in self.modules:
raise ValueError()
o = module(**kwargs)
kwargs.update(o)
return o, kwargs
терпит неудачу (что я и хочу!).
Дело в том, что на практике настоящая логика происходит в вызовах o = module(...)
, которые содержат довольно много инструкций, включая print
s и исключения, которые не вызываются.
Это действительно раздражающая проблема, так как я не могу доверять своей программе в параллельном режиме, поэтому мне приходится запускать ее без пула (тем более, что есть важные assert
, которые игнорируются).
У вас есть идеи?
примечания:
(1) идея поднятия BaseException
исходит из Исключение, сгенерированное в многопроцессорном пуле, не обнаружено , но поток действительно не решает истинный вопрос: почему два ValueError
ведут себя по-разному?
Решение (возможно)
Использование: https://gist.github.com/oseiskar/dbd38098038df8944e21b41c42668440, кажется, исправляет мои проблемы ..