Распределенная библиотека Dask с ошибкой сериализации - PullRequest
1 голос
/ 08 апреля 2020

Я инициализировал кластер с 10 работниками и 4 потоками на каждого работника, и у меня есть 12-ядерный ноутбук, на котором я запускаю это.

    cluster = makeIndividualDashboard.LocalCluster(n_workers=10, threads_per_worker=4)
    client = makeIndividualDashboard.Client()
    runOna(client)
    client.shutdown()

ниже приведен код, где я выполняю кластерные вычисления.

    st = settings.as_dict()
    new_settings = namedtuple("Settings", st.keys())(*st.values())
    to_process = []
    client.cluster.scale(10)
    if mongoConnection:
        mongo_c = True
    else:
        mongo_c = None
    future = client.scatter([net, new_settings, avgNodesConnected, kcoreByGroup, averageTeamDensity,
                             edgesInByAttributeTableMeans, edgesInByAttributeTable, crossTeamTiesTable,
                             descendentLookup, groupDegreeTable, respondentDegreeTable, degreeTable,
                             orgTeamTree, teamMembership, graphId, selectionRange, criteria,
                             onlyForNodes, hashIds, useEnvironment, rollupToLeaders, averageTeamSize,
                             meanCrossInTiesPct, meanCrossOutTiesPct, meanCrossAllTiesPct, mongo_c])
    for node in nodes:
        if FILTER_FOR_USER == None or node == FILTER_FOR_USER:
            to_process.append(dask.delayed(run_me)(node, *future))

    dask.compute(*to_process)

да, это выглядит немного грязно, потому что run_me - очень большая функция, на данный момент я не могу модульный лучше в будущем, может быть, я буду. проблема в том, что эта работа отлично работает, если у меня только 5 рабочих или меньше, но как только я увеличиваю количество рабочих, это вызывает ошибку сериализации.

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
TypeError: ('Could not serialize object of type float64.', '0.68')
distributed.comm.utils - ERROR - ('Could not serialize object of type float64.', '0.68')

Опять же, это очень странно, потому что если я запускаю это на сервере Linux, который имеет 35 ядер, и я помещаю количество рабочих, 30 работает нормально, не знаю, в чем проблема. это указано c для моего местного ?? я могу искать проблему сериализации, но почему это работает только с 5 работниками ??

Заранее спасибо за любую помощь.

1 Ответ

3 голосов
/ 12 апреля 2020

Ошибка говорит о том, что есть какой-то объект, который вы пытаетесь отправить работнику, который не сериализуем. Тип float64, который может быть numpy .float64 объект? Я не знаю, учитывая то, что вы сказали. Я убедился, что Dask отлично перемещается Numpy объектов float64

In [1]: from dask.distributed import Client                                                                                                                                                                                         

In [2]: client = Client()                                                                                                                                                                                                           

In [3]: import numpy as np                                                                                                                                                                                                          

In [4]: x = np.float64(1)                                                                                                                                                                                                           

In [5]: future = client.scatter(x)                                                                                                                                                                                                  

In [6]: future.result()                                                                                                                                                                                                             
Out[6]: 1.0

Я рекомендую вам предоставить MCVE. См { ссылка }

...