Принудительная локализация на подмножествах Dask Dataframe - PullRequest
5 голосов
/ 16 мая 2019

Я пытаюсь распределить большой Dask Dataframe по нескольким машинам для (более поздних) распределенных вычислений в dataframe.Я использую dask-распределенный для этого.

Все dask-распределенные примеры / документы, которые я вижу, заполняют начальную загрузку данных из сетевого ресурса (hdfs, s3 и т. Д.) И, кажется, не расширяютОптимизация DAG для части загрузки (кажется, предполагает, что загрузка сети является необходимым злом и просто съедает первоначальные затраты.) Это подчеркивается при ответе на другой вопрос: Общается ли Dask с HDFS для оптимизации для локальности данных?

Однако я могу видеть случаи, когда мы хотели бы этого.Например, если у нас есть сегментированная база данных + работники dask, размещенные на узлах этой БД, мы бы хотели, чтобы записи только из локального сегмента заполнялись в локальных работниках dask.Судя по документации / примерам, кризис сети кажется неизбежно предполагаемой стоимостью. Можно ли заставить отдельные части данных получать данные от определенных работников?

Альтернатива, которую я пробовал, состоит в том, чтобы заставить каждого работника запускать функцию (итеративно передается каждому работнику), где функция загружает только данные, локальные для этого компьютера / сегмента.Это работает, и у меня есть несколько оптимально локальных фреймов данных с той же схемой столбцов - однако - теперь у меня нет ни одного фрейма данных, а n фреймов данных. Возможно ли объединить / объединить кадры данных на нескольких машинах, чтобы была одна ссылка на один кадр данных, но части имеют сходство (в пределах разумного, как определено в задаче DAG) с конкретными машинами?

Ответы [ 3 ]

3 голосов
/ 16 мая 2019

Вы можете создавать «коллекции» dask, такие как фреймы данных, из фьючерсов и отложенных объектов, которые хорошо взаимодействуют друг с другом.

Для каждого раздела, где вы знаете, какая машина должна его загружать, вы можете создать будущее следующим образом:

f = c.submit(make_part_function, args, workers={'my.worker.ip'})

, где c - это клиент dask, а адрес - это машина, на которой вы хотите, чтобы это произошло. Вы также можете указать allow_other_workers=True, если это предпочтение, а не требование.

Чтобы сделать фрейм данных из списка таких фьючерсов, вы могли бы сделать

df = dd.from_delayed([dask.delayed(f) for f in futures])

и в идеале укажите meta=, дающий описание ожидаемого кадра данных. Теперь дальнейшие операции в данном разделе предпочтительнее планировать на том же рабочем, который уже хранит данные.

1 голос
/ 18 мая 2019

Меня также интересует возможность ограничивать вычисления конкретным узлом (и данными, локализованными для этого узла). Я попытался реализовать вышеизложенное с помощью простого сценария (см. Ниже), но, глядя на результирующий фрейм данных, выдает ошибку (из dask / dataframe / utils.py :: check_meta ()):

ValueError: Metadata mismatch found in `from_delayed`.

Expected partition of type `DataFrame` but got `DataFrame`

Пример:

from dask.distributed import Client
import dask.dataframe as dd
import dask

client = Client(address='<scheduler_ip>:8786')
client.restart()

filename_1 = 'http://samplecsvs.s3.amazonaws.com/Sacramentorealestatetransactions.csv'
filename_2 = 'http://samplecsvs.s3.amazonaws.com/SalesJan2009.csv'

future_1 = client.submit(dd.read_csv, filename_1, workers='w1')
future_2 = client.submit(dd.read_csv, filename_2, workers='w2')

client.has_what()
# Returns: {'tcp://<w1_ip>:41942': ('read_csv-c08b231bb22718946756cf46b2e0f5a1',),
#           'tcp://<w2_ip>:41942': ('read_csv-e27881faa0f641e3550a8d28f8d0e11d',)}

df = dd.from_delayed([dask.delayed(f) for f in [future_1, future_2]])

type(df)
# Returns: dask.dataframe.core.DataFrame

df.head()
# Returns:
#      ValueError: Metadata mismatch found in `from_delayed`.
#      Expected partition of type `DataFrame` but got `DataFrame`

Примечание В среде dask есть два рабочих узла (с псевдонимом w1 и w2) - узел планировщика, и сценарий выполняется на внешнем хосте. dask == 1.2.2, распределенный == 1.28.1

0 голосов
/ 18 мая 2019

Странно вызывать многие функции dask dataframe параллельно. Возможно, вы хотели вместо этого вызывать множество вызовов Pandas read_csv параллельно?

# future_1 = client.submit(dd.read_csv, filename_1, workers='w1')
# future_2 = client.submit(dd.read_csv, filename_2, workers='w2')
future_1 = client.submit(pandas.read_csv, filename_1, workers='w1')
future_2 = client.submit(pandas.read_csv, filename_2, workers='w2')

См. https://docs.dask.org/en/latest/delayed-best-practices.html#don-t-call-dask-delayed-on-other-dask-collections для получения дополнительной информации

...