Dask compute не работает при использовании клиента, работает при отсутствии настройки клиента - PullRequest
0 голосов
/ 06 ноября 2018

Я пытаюсь использовать клиент dask для распараллеливания вычислений. Когда я запускаю df.compute (), я получаю правильный вывод (хотя он очень медленный), но когда я запускаю то же самое после настройки клиента, я получаю следующую ошибку:

distributed.protocol.pickle - INFO - Failed to serialize <function part at 0x7fd5186ed730>. Exception: can't pickle _thread.RLock objects

Вот мой код, в первом df.compute () я получаю ожидаемый результат, во втором - нет.

@dask.delayed
def part(x):
    lower, upper = x
    q = "SELECT id,tfidf_vec,emb_vec FROM document_table"
    lines=man.session.execute(q)
    counter = lower
    df = []
    for line in lines:
        df.append(line)
        counter += 1
        if counter == upper:
            break
    return pd.DataFrame(df)

parts = [part(x) for x in [[0,100000],[100000,200000]]]
df = dd.from_delayed(parts)
df.compute()

from dask.distributed import Client
client = Client('127.0.0.1:8786')
df.compute()

1 Ответ

0 голосов
/ 06 ноября 2018

Ваша функция содержит ссылку на man.session, которая является частью функции closure . Когда вы используете планировщик по умолчанию, потоки, объект может быть разделен между потоками, которые исполняют ваш код. Когда вы используете распределенный планировщик, функция должна быть сериализована и отправлена ​​рабочим в процессе (ах) различий.

Вы должны создать функцию, которая создает объект сеанса при каждом вызове, как было предложено в качестве ответа на ваш очень похожий вопрос.

...