Могу ли я использовать dask.delayed для функции, завернутой в ctypes? - PullRequest
1 голос
/ 31 марта 2020

Цель состоит в том, чтобы использовать dask.delayed для распараллеливания некоторых «смущающе параллельных» разделов моего кода. Код включает в себя вызов функции python, которая оборачивает c -функцию, используя ctypes. Чтобы понять ошибки, которые я получал, я написал очень простой пример c.

c -функция:

double zippy_sum(double x, double y)
{
return x + y;
}

python:

from dask.distributed import Client
client = Client(n_workers = 4)
client

import os
import dask
import ctypes

current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))

_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double

def zippy(x, y):

    z = _zippy_sum(x, y)

    return z

result = dask.delayed(zippy)(1., 2.)
result.compute()

Traceback:

- -------------------------------------------------- ----------------------- KeyError Traceback (последний вызов был последним) ~ / .edm / envs / evaxi3.6 / lib / python3 .6 / site-packages / distrib / worker.py в функции dumps_function (fun c) 3286 с _cache_lock: -> 3287 result = cache_dumps [func] 3288 за исключением KeyError:

~ / .edm / envs / evaxi3.6 /lib/python3.6/site-packages/distributed/utils.py в getitem (self, key) 1517 def getitem (self, key): -> значение 1518 = super (). getitem (ключ) 1519 self.data.move_to_end (ключ)

~ / .edm / envs / evaxi3.6 / lib / python3 .6 / collection /init.py в getitem (self, key) 990 возвращает self. class . отсутствует (self, key) -> 991 повысить KeyError (ключ) 992 def setitem (self, key, item): self.data [key] = item

KeyError: функция zippy в 0x11ffc50d0

во время ха При отмене вышеупомянутого исключения произошло другое исключение:

ValueError Traceback (последний вызов был последним) ~ / .edm / envs / evaxi3.6 / lib / python3 .6 / site-packages / distrib / protocol /pickle.py в дампах (x) 40, если b " main " в результате: ---> 41 вернуть cloudpickle.dumps (x, protocol = pickle.HIGHEST_PROTOCOL) 42 else:

~ / .edm / envs / evaxi3.6 / lib / python3 .6 / site-packages / cloudpickle / cloudpickle.py в дампах (obj, протокол) 1147 cp = CloudPickler (файл, протокол = протокол) -> 1148 cp.dump (obj) 1149 возвращает file.getvalue ()

~ / .edm / envs / evaxi3.6 / lib / python3 .6 / site-packages / cloudpickle / cloudpickle.py в дамп ( self, obj) 490 try: -> 491 возвращает Pickler.dump (self, obj) 492 кроме RuntimeError как e:

~ / .edm / envs / evaxi3.6 / lib / python3 .6 /pickle.py в дампе (self, obj) 408 self.framer.start_framing () -> 409 self.save (obj) 410 self.write (STOP)

~ / .edm / envs / evaxi3 .6 / lib / python3 .6 / pickle.py в save (self, obj, save_persistent_id) 475, если f не None: -> 476 f (self, o bj) # Вызвать несвязанный метод с явным self 477 return

~ / .edm / envs / evaxi3.6 / lib / python3 .6 / site-packages / cloudpickle / cloudpickle.py в save_function (self, obj, name) 565 else: -> 566 return self.save_function_tuple (obj) 567

~ / .edm / envs / evaxi3.6 / lib / python3 .6 / site-packages / cloudpickle / cloudpickle.py в save_function_tuple (self, fun c) 779 состояние ['kwdefaults'] = fun c. kwdefaults -> 780 сохранение (состояние) 781 запись (pickle.TUPLE)

~ / .edm / envs / evaxi3.6 / lib / python3 .6 / pickle.py в save (self, obj, save_persistent_id) 475, если f не None: -> 476 f (self, obj) # Вызвать несвязанный метод с явным self 477 return

~ / .edm / envs / evaxi3.6 / lib / python3 .6 / pickle.py в save_dict (self, obj) 820 self.memoize (obj) -> 821 self._batch_setitems (obj.items ()) 822

~ / .edm / envs / evaxi3.6 / lib / python3 .6 / pickle.py в _batch_setitems (self , items) 846 save (k) -> 847 save (v) 848 write (SETITEMS)

~ / .edm / envs / evaxi3.6 / lib / python3 .6 / pickle. py в save (self, obj, save_persistent_id) 475, если f не равно None: -> 476 f (self, obj) # Вызов несвязанного метода с явным self 477 return

~ / .edm / envs / evaxi3 .6 / lib / python3 .6 / pickle.py в save_dict (self, obj) 820 self.memoize (obj) -> 821 self._batch_setitems (obj.items ()) 822

~ /.edm/envs/evaxi3.6/lib/python3.6/pickle.py в _batch_setitems (self, items) 851 save (k) -> 852 save (v) 853 write (SETITEM)

~ / .edm / envs / evaxi3.6 / lib / python3 .6 / pickle.py в save (self, obj, save_persistent_id) 495, если уменьшение не равно None: -> 496 rv = уменьшить (self. proto) 497 else:

ValueError: объекты ctypes, содержащие указатели, не могут быть засечены

К сожалению, я до сих пор не понимаю ошибок! Я только начинаю с dask и имею только базовый c опыт работы с ctypes. Есть ли у кого-нибудь предложения о том, как решить эту проблему, или даже понять, что нужно решать?

Спасибо!

1 Ответ

0 голосов
/ 31 марта 2020

Действительно, вы не можете сериализовать функцию, ссылающуюся на C -функцию в замыкании или аргументах. Однако, если ваша функция находится в модуле, который доступен всем работникам, то вы в конечном итоге сериализуете только имя модуля, и python делает все правильно.

module zippy.py (где-то на вашем python PATH, возможно текущий каталог для примера):

import os
import dask
import ctypes

current_dir = os.getcwd() #os.path.abspath(os.path.dirname(__file__))
_mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))

_zippy_sum = _mod.zippy_sum
_zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
_zippy_sum.restype = ctypes.c_double

def zippy(x, y):

    z = _zippy_sum(x, y)

    return z

основной скрипт:

from dask.distributed import Client
import zippy
if __name__ == "__main__":
    # if running as a script, this is helpful
    client = Client(n_workers = 4)

result = dask.delayed(zippy.zippy)(1., 2.)
result.compute()

Другое решение, если вы не хотите создавать модуль, это сделать все ваши C импорта и определения внутри функции.

def zippy(x, y):
    _mod = ctypes.cdll.LoadLibrary(os.path.join(current_dir, "zippy.so"))

    _zippy_sum = _mod.zippy_sum
    _zippy_sum.argtypes = [ctypes.c_double, ctypes.c_double]
    _zippy_sum.restype = ctypes.c_double

    z = _zippy_sum(x, y)

    return z
...