Как добиться параллелизма в блокноте Databricks? - PullRequest
0 голосов
/ 01 ноября 2019

Я хочу иметь параллелизм в Databricks, что означает, что мой код будет использовать ядра процессора для вычисления не потока ядра. Для этого я использовал многопроцессорную обработку -

import multiprocessing as mp
from multiprocessing.pool import ThreadPool

def hello(name):
  print("Hello " + name + "\n")

if __name__ == '__main__':
    pool = mp.Pool(mp.cpu_count())
    # pool = ThreadPool(mp.cpu_count()) # this works in both Databricks and local computer
    pool.map(hello, ["spark", "databricks"])
    pool.close()

Но это выдает мне ошибку - PicklingError: Can't pickle <function hello at 0x7fc136cdbd08>: attribute lookup hello on __main__ failed

Трассировка всего стека -

---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
<command-500228233074730> in <module>
      8     pool = mp.Pool(mp.cpu_count())
      9     # pool = ThreadPool(mp.cpu_count())
---> 10     pool.map(hello, ["spark", "databricks"])
     11     pool.close()

/usr/lib/python3.7/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    266         in a list that is returned.
    267         '''
--> 268         return self._map_async(func, iterable, mapstar, chunksize).get()
    269 
    270     def starmap(self, func, iterable, chunksize=None):

/usr/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    655             return self._value
    656         else:
--> 657             raise self._value
    658 
    659     def _set(self, i, obj):

/usr/lib/python3.7/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    429                         break
    430                     try:
--> 431                         put(task)
    432                     except Exception as e:
    433                         job, idx = task[:2]

/usr/lib/python3.7/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

/usr/lib/python3.7/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     49     def dumps(cls, obj, protocol=None):
     50         buf = io.BytesIO()
---> 51         cls(buf, protocol).dump(obj)
     52         return buf.getbuffer()
     53 

PicklingError: Can't pickle <function hello at 0x7fc136cdbd08>: attribute lookup hello on __main__ failed

Хотя при запускетот же код на моем локальном компьютере, он работает нормально (вы можете проверить тоже).

Как решить эту проблему?

...