Dask Распределенный планировщик и большие функции - PullRequest
1 голос
/ 04 мая 2020

В контексте распределенного планировщика Dask без LocalCluster: Может ли кто-нибудь помочь мне понять динамику использования большой (кучной) функции отображения?

Например, рассмотрим кадр данных Dask ddf и операцию map_partitions:

def mapper():
  resource=... #load some large resource eg 50MB

  def inner(pdf):
    return pdf.apply(lambda x: ..., axis=1)

  return inner

mapper_fn = mapper() #50MB on heap
ddf.map_partitions(mapper_fn)

Что здесь происходит? Dask будет сериализовать mapper_fn и отправлять на все задания? Скажем, у меня есть n разделов, так что n задач.

Опытным путем, я заметил, что если у меня есть 40 задач и маппер 50 МБ, то для начала работы требуется около 70 секунд, кластер, похоже, сидит там с полным процессором, но на приборной панели отображается ничего. Что здесь происходит? Каковы последствия наличия больших (heap) функций в распределенном планировщике di sh?

1 Ответ

1 голос
/ 08 мая 2020

Dask сериализует нетривиальные функции с помощью cloudpickle и включает сериализованную версию этих функций в каждую задачу. Это крайне неэффективно. Мы рекомендуем не делать этого, а вместо этого передавать данные явно.

resource = ...

ddf.map_partitions(func, resource=resource)

Это будет намного эффективнее.

...