Я пытаюсь использовать многопроцессорность, чтобы отфильтровать некоторые вещи из каждого df в списке фреймов данных:
# Accepts and returns a list of dataframes
dataframes = myClass.filter_calibration(dataframes)
в myClass:
self.n_cores = -2
self.backend = 'loky'
def filter_calibration(self, dataframes, verbose=True):
results = Parallel(n_jobs=self.n_cores, backend=self.backend, verbose=verbose)(
delayed(helperFunctions.filter_calibration_helper)(df) for df in dataframes)
return results
В отдельном .py-файле с вспомогательными функциями (не в классе)
def filter_calibration_helper(df):
if True in df['Calibrating'].unique():
calib = np.array(df['Calibrating'])
valids = np.array(df['Valid'])
calib_indices = np.argwhere(calib == True)
valids[calib_indices] = False
df['Valid'] = valids
else:
calib = np.array(df['Calibrating'])
valids = np.array(df['Valid'])
calib_indices = np.argwhere(calib == 1)
valids[calib_indices] = False
df['Valid'] = valids
return df
Однако я получаю сообщение об ошибке:
File "/Users/ima/example_script.py", line 45, in <module>
dataframes = myClass.filter_calibration(dataframes)
File "/Users/ima/myClass.py", line 59, in filter_calibration
delayed(helperFunctions.filter_calibration_helper)(df) for df in zip(dataframes))
File "/Users/ima/anaconda3/lib/python3.7/site-packages/joblib/parallel.py", line 934, in __call__
self.retrieve()
File "/Users/ima/anaconda3/lib/python3.7/site-packages/joblib/parallel.py", line 833, in retrieve
self._output.extend(job.get(timeout=self.timeout))
File "/Users/ima/anaconda3/lib/python3.7/site-packages/joblib/_parallel_backends.py", line 521, in wrap_future_result
return future.result(timeout=timeout)
File "/Users/ima/anaconda3/lib/python3.7/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/Users/ima/anaconda3/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.
И по жизни я не могу понять, что происходит не так. Единственный аргумент - это датафрейм, который отлично работает с Parallel в некоторых других функциях моего кода.
Я тоже пробовал бэкэнд multiprocessing
joblib, но он просто останавливается.
Вся помощь приветствуется!