Несколько возвратов и распечаток из параллельной функции Python joblib - PullRequest
0 голосов
/ 05 марта 2019

Ищет вывод нескольких выходов (или возвратов) из параллелизованной функции в Python. Получение какой-то ошибки травления. Обратите внимание, я также хотел бы видеть распечатки x в подпроцессах.

%%time
from math import sqrt
from joblib import Parallel, delayed
# pip install joblib
import multiprocessing

num_cores = multiprocessing.cpu_count()

def producer():

  allOutput = []
  allX = []
  for x in range(0, 10):

    x = x*5
    print('Produced %s' % x)
    output = sqrt(x)
    print('Output sqrt %s' % output)
    allOutput.append(output)
    allX.append(x)

  return allOutput, allX

allOutput, allX = Parallel(n_jobs=num_cores)(delayed(i) for i in producer()) 

И выходные данные должны быть двумя списками с x*5 и sqrt(x*5).

Output sqrt 0.0
Produced 5
Output sqrt 2.23606797749979
Produced 10
Output sqrt 3.1622776601683795
Produced 15
Output sqrt 3.872983346207417
Produced 20
Output sqrt 4.47213595499958
Produced 25
Output sqrt 5.0
Produced 30
Output sqrt 5.477225575051661
Produced 35
Output sqrt 5.916079783099616
Produced 40
Output sqrt 6.324555320336759
Produced 45
Output sqrt 6.708203932499369
---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/joblib/externals/loky/backend/queues.py", line 151, in _feed
    obj, reducers=reducers)
  File "/usr/local/lib/python3.6/dist-packages/joblib/externals/loky/backend/reduction.py", line 145, in dumps
    p.dump(obj)
  File "/usr/local/lib/python3.6/dist-packages/joblib/parallel.py", line 290, in __getstate__
    for func, args, kwargs in self.items]
  File "/usr/local/lib/python3.6/dist-packages/joblib/parallel.py", line 290, in <listcomp>
    for func, args, kwargs in self.items]
TypeError: 'function' object is not iterable
"""

The above exception was the direct cause of the following exception:

PicklingError                             Traceback (most recent call last)
<ipython-input-29-ceae606b0c70> in <module>()
----> 1 get_ipython().run_cell_magic('time', '', "from math import sqrt\nfrom joblib import Parallel, delayed\n\ndef producer():\n  \n  for x in range(0, 10):\n    \n    x = x*5\n    print('Produced %s' % x)\n    output = sqrt(x)\n    print('Output sqrt %s' % output)\n    yield output, x\n\noutSqrt, outX = Parallel(n_jobs=2)(delayed(i) for i in producer()) \n")

/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py in run_cell_magic(self, magic_name, line, cell)
   2115             magic_arg_s = self.var_expand(line, stack_depth)
   2116             with self.builtin_trap:
-> 2117                 result = fn(magic_arg_s, cell)
   2118             return result
   2119 

</usr/local/lib/python3.6/dist-packages/decorator.py:decorator-gen-60> in time(self, line, cell, local_ns)

/usr/local/lib/python3.6/dist-packages/IPython/core/magic.py in <lambda>(f, *a, **k)
    186     # but it's overkill for just that one bit of state.
    187     def magic_deco(arg):
--> 188         call = lambda f, *a, **k: f(*a, **k)
    189 
    190         if callable(arg):

/usr/local/lib/python3.6/dist-packages/IPython/core/magics/execution.py in time(self, line, cell, local_ns)
   1191         else:
   1192             st = clock2()
-> 1193             exec(code, glob, local_ns)
   1194             end = clock2()
   1195             out = None

<timed exec> in <module>()

/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in __call__(self, iterable)
    994 
    995             with self._backend.retrieval_context():
--> 996                 self.retrieve()
    997             # Make sure that we get a last message telling us we are done
    998             elapsed_time = time.time() - self._start_time

/usr/local/lib/python3.6/dist-packages/joblib/parallel.py in retrieve(self)
    897             try:
    898                 if getattr(self._backend, 'supports_timeout', False):
--> 899                     self._output.extend(job.get(timeout=self.timeout))
    900                 else:
    901                     self._output.extend(job.get())

/usr/local/lib/python3.6/dist-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
    515         AsyncResults.get from multiprocessing."""
    516         try:
--> 517             return future.result(timeout=timeout)
    518         except LokyTimeoutError:
    519             raise TimeoutError()

/usr/lib/python3.6/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433             else:
    434                 raise TimeoutError()

/usr/lib/python3.6/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

PicklingError: Could not pickle the task to send it to the workers.

1 Ответ

1 голос
/ 06 марта 2019

Это дает результаты.Однако операторы печати не возвращаются в правильном порядке.

from math import sqrt
from joblib import Parallel, delayed
# pip install joblib
import multiprocessing
import sys

num_cores = multiprocessing.cpu_count()

def returnfunc(x, n):

  sys.stdout.flush()

  x = x*5
  output = sqrt(x)

  print('Produced %s' % x)
  print('Output sqrt %s' % output)  

  return [x, output]

with parallel_backend('multiprocessing'):
  num_cores = multiprocessing.cpu_count()
  returnedLists = Parallel(n_jobs=num_cores)(delayed(returnfunc)(x, 10) for x in range(0, 10))
  sys.stdout.flush() # May help with print ordering??  

print([i[0] for i in returnedLists])
print([i[1] for i in returnedLists])

Выходы:

Produced 0
Produced 5
Output sqrt 2.23606797749979
Output sqrt 0.0
Produced 10
Output sqrt 3.1622776601683795
Produced 15
Produced 20
Output sqrt 4.47213595499958
Produced 25
Output sqrt 5.0
Produced 30
Output sqrt 3.872983346207417
Output sqrt 5.477225575051661
Produced 35
Output sqrt 5.916079783099616
Produced 40
Output sqrt 6.324555320336759
Produced 45
Output sqrt 6.708203932499369
[0, 5, 10, 15, 20, 25, 30, 35, 40, 45]
[0.0, 2.23606797749979, 3.1622776601683795, 3.872983346207417, 4.47213595499958, 5.0, 5.477225575051661, 5.916079783099616, 6.324555320336759, 6.708203932499369]
...