Ошибка при выборе объекта `matlab` в контексте joblib` Parallel` - PullRequest
8 голосов
/ 29 марта 2019

Я параллельно запускаю некоторый код Matlab из контекста Python (я знаю, но это именно то, что происходит), и у меня возникает ошибка импорта, связанная с matlab.double. Тот же код прекрасно работает в multiprocessing.Pool, поэтому у меня возникают проблемы с выяснением, в чем проблема. Вот минимальный воспроизводящий контрольный пример.

import matlab
from multiprocessing import Pool
from joblib import Parallel, delayed

# A global object that I would like to be available in the parallel subroutine
x = matlab.double([[0.0]])

def f(i):
    print(i, x)

with Pool(4) as p:
    p.map(f, range(10))
    # This prints 1, [[0.0]]\n2, [[0.0]]\n... as expected

for _ in Parallel(4, backend='multiprocessing')(delayed(f)(i) for i in range(10)):
    pass
# This also prints 1, [[0.0]]\n2, [[0.0]]\n... as expected

# Now run with default `backend='loky'`
for _ in Parallel(4)(delayed(f)(i) for i in range(10)):
    pass
# ^ this crashes.

Итак, единственный проблемный - тот, который использует 'loky' бэкэнд. Полный возврат:

exception calling callback for <Future at 0x7f63b5a57358 state=finished raised BrokenProcessPool>
joblib.externals.loky.process_executor._RemoteTraceback: 
'''
Traceback (most recent call last):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 391, in _process_worker
    call_item = call_queue.get(block=True, timeout=timeout)
  File "~/miniconda3/envs/myenv/lib/python3.6/multiprocessing/queues.py", line 113, in get
    return _ForkingPickler.loads(res)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/mlarray.py", line 31, in <module>
    from _internal.mlarray_sequence import _MLArrayMetaClass
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_sequence.py", line 3, in <module>
    from _internal.mlarray_utils import _get_strides, _get_size, \
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_utils.py", line 4, in <module>
    import matlab
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/__init__.py", line 24, in <module>
    from mlarray import double, single, uint8, int8, uint16, \
ImportError: cannot import name 'double'
'''

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/_base.py", line 625, in _invoke_callbacks
    callback(self)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 309, in __call__
    self.parallel.dispatch_next()
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 731, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
    self._dispatch(tasks)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 510, in apply_async
    future = self._workers.submit(SafeFunction(func))
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/reusable_executor.py", line 151, in submit
    fn, *args, **kwargs)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 1022, in submit
    raise self._flags.broken
joblib.externals.loky.process_executor.BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.
joblib.externals.loky.process_executor._RemoteTraceback: 
'''
Traceback (most recent call last):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 391, in _process_worker
    call_item = call_queue.get(block=True, timeout=timeout)
  File "~/miniconda3/envs/myenv/lib/python3.6/multiprocessing/queues.py", line 113, in get
    return _ForkingPickler.loads(res)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/mlarray.py", line 31, in <module>
    from _internal.mlarray_sequence import _MLArrayMetaClass
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_sequence.py", line 3, in <module>
    from _internal.mlarray_utils import _get_strides, _get_size, \
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_utils.py", line 4, in <module>
    import matlab
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/__init__.py", line 24, in <module>
    from mlarray import double, single, uint8, int8, uint16, \
ImportError: cannot import name 'double'
'''

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "test.py", line 20, in <module>
    for _ in Parallel(4)(delayed(f)(i) for i in range(10)):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 934, in __call__
    self.retrieve()
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 833, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 521, in wrap_future_result
    return future.result(timeout=timeout)
  File "~/miniconda3/envs/myenv/lib/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "~/miniconda3/envs/myenv/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/_base.py", line 625, in _invoke_callbacks
    callback(self)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 309, in __call__
    self.parallel.dispatch_next()
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 731, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
    self._dispatch(tasks)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 510, in apply_async
    future = self._workers.submit(SafeFunction(func))
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/reusable_executor.py", line 151, in submit
    fn, *args, **kwargs)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 1022, in submit
    raise self._flags.broken
joblib.externals.loky.process_executor.BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.

Глядя на трассировку, кажется, что основной причиной является проблема импорта пакета matlab в дочерний процесс.

Вероятно, стоит отметить, что все это работает нормально, если вместо этого я определил x = np.array([[0.0]]) (после импорта numpy as np). И, конечно же, у основного процесса нет проблем с любым импортом matlab, поэтому я не уверен, почему это сделал бы дочерний процесс.

Я не уверен, связана ли эта ошибка с пакетом matlab или с глобальными переменными и cloudpickle или loky. В моем приложении это помогло бы придерживаться loky, поэтому я буду признателен за любую информацию!

Я должен также отметить, что я использую официальный движок Matlab для Python: https://www.mathworks.com/help/matlab/matlab-engine-for-python.html. Я полагаю, что это может затруднить другим попробовать контрольные примеры, поэтому я хотел бы воспроизвести эту ошибку с тип, отличный от matlab.double, но другого я пока не нашел.

Копаясь больше, я заметил, что процесс импорта пакета matlab более цикличен, чем я ожидал, и я полагаю, что это может быть частью проблемы? Проблема в том, что когда import matlab запускается loky _ForkingPickler, сначала импортируется некоторый файл matlab/mlarray.py, который импортирует некоторые другие файлы, один из которых содержит import matlab, и это вызывает matlab/__init__.py к быть запущенным, который внутри имеет from mlarray import double, single, uint8, ..., что является строкой, которая вызывает сбой.

Может ли быть эта цикличность проблемой? Если так, почему я могу импортировать этот модуль в основной процесс, но не в loky бэкэнд?

1 Ответ

3 голосов
/ 08 апреля 2019

Ошибка вызвана неправильным порядком загрузки глобальных объектов в дочерних процессах. Это хорошо видно в следе _ForkingPickler.loads(res) -> ... -> import matlab -> from mlarray import ... что matlab еще не импортируется, когда глобальная переменная x загружена cloudpickle.

joblib с loky, кажется, обрабатывает модули как обычные глобальные объекты и динамически отправляет их дочерним процессам. joblib не записывает порядок, в котором были определены эти объекты / модули. Поэтому они загружаются (инициализируются) в случайном порядке в дочерних процессах.

Простой обходной путь - вручную выбрать объект matlab и загрузить его после импорта matlab внутри вашей функции.

import matlab
import pickle

px = pickle.dumps(matlab.double([[0.0]]))

def f(i):
    import matlab
    x=pickle.loads(px)
    print(i, x)

Конечно, вы также можете использовать joblib.dumps и loads для сериализации объектов.

Использовать инициализатор

Благодаря предложению @Aaron вы также можете использовать initializer ( для loky ) для импорта Matlab перед загрузкой x.

В настоящее время нет простого API для указания initializer. Поэтому я написал простую функцию:

def with_initializer(self, f_init):
    # Overwrite initializer hook in the Loky ProcessPoolExecutor
    # https://github.com/tomMoral/loky/blob/f4739e123acb711781e46581d5ed31ed8201c7a9/loky/process_executor.py#L850
    hasattr(self._backend, '_workers') or self.__enter__()
    origin_init = self._backend._workers._initializer
    def new_init():
        origin_init()
        f_init()
    self._backend._workers._initializer = new_init if callable(origin_init) else f_init
    return self

Это немного странно, но хорошо работает с текущей версией joblib и loky. Тогда вы можете использовать его как:

import matlab
from joblib import Parallel, delayed

x = matlab.double([[0.0]])

def f(i):
    print(i, x)

def _init_matlab():
    import matlab

with Parallel(4) as p:
    for _ in with_initializer(p, _init_matlab)(delayed(f)(i) for i in range(10)):
        pass

Я надеюсь, что разработчики joblib добавят аргумент initializer в конструктор Parallel в будущем.

...