Ищет вывод нескольких выходов (или возвратов) из параллелизованной функции в Python. Получение какой-то ошибки травления. Обратите внимание, я также хотел бы видеть распечатки x в подпроцессах.
%%time
from math import sqrt
from joblib import Parallel, delayed
# pip install joblib
import multiprocessing
num_cores = multiprocessing.cpu_count()
def producer():
allOutput = []
allX = []
for x in range(0, 10):
x = x*5
print('Produced %s' % x)
output = sqrt(x)
print('Output sqrt %s' % output)
allOutput.append(output)
allX.append(x)
return allOutput, allX
allOutput, allX = Parallel(n_jobs=num_cores)(delayed(i) for i in producer())
И выходные данные должны быть двумя списками с x*5
и sqrt(x*5)
.
Output sqrt 0.0
Produced 5
Output sqrt 2.23606797749979
Produced 10
Output sqrt 3.1622776601683795
Produced 15
Output sqrt 3.872983346207417
Produced 20
Output sqrt 4.47213595499958
Produced 25
Output sqrt 5.0
Produced 30
Output sqrt 5.477225575051661
Produced 35
Output sqrt 5.916079783099616
Produced 40
Output sqrt 6.324555320336759
Produced 45
Output sqrt 6.708203932499369
---------------------------------------------------------------------------
_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/joblib/externals/loky/backend/queues.py", line 151, in _feed
obj, reducers=reducers)
File "/usr/local/lib/python3.6/dist-packages/joblib/externals/loky/backend/reduction.py", line 145, in dumps
p.dump(obj)
File "/usr/local/lib/python3.6/dist-packages/joblib/parallel.py", line 290, in __getstate__
for func, args, kwargs in self.items]
File "/usr/local/lib/python3.6/dist-packages/joblib/parallel.py", line 290, in <listcomp>
for func, args, kwargs in self.items]
TypeError: 'function' object is not iterable
"""
The above exception was the direct cause of the following exception:
PicklingError Traceback (most recent call last)
<ipython-input-29-ceae606b0c70> in <module>()
----> 1 get_ipython().run_cell_magic('time', '', "from math import sqrt\nfrom joblib import Parallel, delayed\n\ndef producer():\n \n for x in range(0, 10):\n \n x = x*5\n print('Produced %s' % x)\n output = sqrt(x)\n print('Output sqrt %s' % output)\n yield output, x\n\noutSqrt, outX = Parallel(n_jobs=2)(delayed(i) for i in producer()) \n")
/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
2115 magic_arg_s = self.var_expand(line, stack_depth)
2116 with self.builtin_trap:
-> 2117 result = fn(magic_arg_s, cell)
2118 return result
2119
</usr/local/lib/python3.6/dist-packages/decorator.py:decorator-gen-60> in time(self, line, cell, local_ns)
/usr/local/lib/python3.6/dist-packages/IPython/core/magic.py in <lambda>(f, *a, **k)
186 # but it's overkill for just that one bit of state.
187 def magic_deco(arg):
--> 188 call = lambda f, *a, **k: f(*a, **k)
189
190 if callable(arg):
/usr/local/lib/python3.6/dist-packages/IPython/core/magics/execution.py in time(self, line, cell, local_ns)
1191 else:
1192 st = clock2()
-> 1193 exec(code, glob, local_ns)
1194 end = clock2()
1195 out = None
<timed exec> in <module>()
/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in __call__(self, iterable)
994
995 with self._backend.retrieval_context():
--> 996 self.retrieve()
997 # Make sure that we get a last message telling us we are done
998 elapsed_time = time.time() - self._start_time
/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in retrieve(self)
897 try:
898 if getattr(self._backend, 'supports_timeout', False):
--> 899 self._output.extend(job.get(timeout=self.timeout))
900 else:
901 self._output.extend(job.get())
/usr/local/lib/python3.6/dist-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
515 AsyncResults.get from multiprocessing."""
516 try:
--> 517 return future.result(timeout=timeout)
518 except LokyTimeoutError:
519 raise TimeoutError()
/usr/lib/python3.6/concurrent/futures/_base.py in result(self, timeout)
430 raise CancelledError()
431 elif self._state == FINISHED:
--> 432 return self.__get_result()
433 else:
434 raise TimeoutError()
/usr/lib/python3.6/concurrent/futures/_base.py in __get_result(self)
382 def __get_result(self):
383 if self._exception:
--> 384 raise self._exception
385 else:
386 return self._result
PicklingError: Could not pickle the task to send it to the workers.