У меня есть динамический кластер Dask Kubernetes.Я хочу загрузить 35 файлов паркета (около 1,2 ГБ) из хранилища Gcloud в Dask Dataframe, затем обработать его с apply()
и после сохранения результата в файл паркета в Gcloud.
Во время загрузки файлов из хранилища Gcloud,Использование памяти кластера увеличивается до 3-4 ГБ.Затем рабочие (у каждого рабочего есть 2 ГБ ОЗУ) завершаются / перезапускаются, и некоторые задачи теряются, поэтому кластер начинает вычислять одни и те же вещи по кругу.Я удалил операцию apply()
и оставил только read_parquet()
, чтобы проверить, вызывает ли мой пользовательский код проблему, но проблема была такой же, даже с одной операцией read_parquet()
.Это код:
client = Client('<ip>:8786')
client.restart()
def command():
client = get_client()
df = dd.read_parquet('gcs://<bucket>/files/name_*.parquet', storage_options={'token':'cloud'}, engine='fastparquet')
df = df.compute()
x = client.submit(command)
x.result()
Примечание. Я отправляю одну командную функцию для запуска всех необходимых команд во избежание проблем с аутентификацией gcsfs внутри кластера
После некоторого исследования я понял, что проблема может быть в .compute()
, который возвращает все данные процессу, но этот процесс (моя командная функция) выполняется на рабочем месте.Из-за этого работнику не хватает оперативной памяти, происходит сбой и потеря всех вычисленных задач, что вызывает повторное выполнение задач.
Моя цель:
- для чтения из файлов паркета
- выполнить некоторые вычисления с
apply()
- и даже без возврата данных из кластера записать их обратно в хранилище Gcloud в формате паркета.
Итак, я простохотите сохранить данные в кластере и не возвращать их обратно.Просто вычислите и сохраните данные в другом месте.
После прочтения распределенных документов Dask я нашел методы client.persist()/compute()
и .scatter()
.Они выглядят так, как мне нужно, но я не совсем понимаю, как их использовать.
Не могли бы вы, пожалуйста, помочь мне с методами client.persist()
и client.compute()
для моего примера или предложить другой способ сделатьЭто?Большое спасибо!
Версия Dask: 0.19.1
Распределенная версия Dask: 1.23.1
Версия Python: 3.5.1