Dask распределенного выполнения вычислений без возврата данных - PullRequest
0 голосов
/ 07 октября 2018

У меня есть динамический кластер Dask Kubernetes.Я хочу загрузить 35 файлов паркета (около 1,2 ГБ) из хранилища Gcloud в Dask Dataframe, затем обработать его с apply() и после сохранения результата в файл паркета в Gcloud.

Во время загрузки файлов из хранилища Gcloud,Использование памяти кластера увеличивается до 3-4 ГБ.Затем рабочие (у каждого рабочего есть 2 ГБ ОЗУ) завершаются / перезапускаются, и некоторые задачи теряются, поэтому кластер начинает вычислять одни и те же вещи по кругу.Я удалил операцию apply() и оставил только read_parquet(), чтобы проверить, вызывает ли мой пользовательский код проблему, но проблема была такой же, даже с одной операцией read_parquet().Это код:

client = Client('<ip>:8786')
client.restart()

def command():
    client = get_client()
    df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
    df = df.compute()

x = client.submit(command)
x.result()

Примечание. Я отправляю одну командную функцию для запуска всех необходимых команд во избежание проблем с аутентификацией gcsfs внутри кластера

После некоторого исследования я понял, что проблема может быть в .compute(), который возвращает все данные процессу, но этот процесс (моя командная функция) выполняется на рабочем месте.Из-за этого работнику не хватает оперативной памяти, происходит сбой и потеря всех вычисленных задач, что вызывает повторное выполнение задач.

Моя цель:

  • для чтения из файлов паркета
  • выполнить некоторые вычисления с apply()
  • и даже без возврата данных из кластера записать их обратно в хранилище Gcloud в формате паркета.

Итак, я простохотите сохранить данные в кластере и не возвращать их обратно.Просто вычислите и сохраните данные в другом месте.

После прочтения распределенных документов Dask я нашел методы client.persist()/compute() и .scatter().Они выглядят так, как мне нужно, но я не совсем понимаю, как их использовать.

Не могли бы вы, пожалуйста, помочь мне с методами client.persist() и client.compute() для моего примера или предложить другой способ сделатьЭто?Большое спасибо!

Версия Dask: 0.19.1

Распределенная версия Dask: 1.23.1

Версия Python: 3.5.1

1 Ответ

0 голосов
/ 08 октября 2018
df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
df = df.compute()  # this triggers computations, but brings all of the data to one machine and creates a Pandas dataframe

df = df.persist()  # this triggers computations, but keeps all of the data in multiple pandas dataframes spread across multiple machines
...