Dask и numpy - медленное преобразование между массивом numpy и массивом dask - PullRequest
0 голосов
/ 20 февраля 2020

Мне нужно сохранить массив dask из большого массива numpy. Ниже приведен минимальный рабочий пример, демонстрирующий процесс. Обратите внимание, что a создается с numpy.random только для этого mwe, к сожалению, я не могу создать массив с dask.

import numpy as np
import dask.array as da
from dask.distributed import Client

a = numpy.random.randint(0,2,size=4000000*8000).reshape((4000000,8000))
# here the conversion and saving
client = Client(n_workers=90, threads_per_worker=20, processes=True)
dask_array = da.from_array( a, chunks = 100000)
da.to_npy_stack(‘my-folder/’, dask_array)
client.close()

Проблема, с которой я сталкиваюсь, заключается в том, что a в памяти занимает около 100 ГБ, однако при выполнении части dask используемая память начинает увеличиваться до тех пор, пока почти не заполнит доступную оперативную память, то есть более 300 ГБ. Затем он выполняет некоторые вычисления, и через некоторое время я получил ошибку памяти (например, 10 минут). Мне нужен массив, сохраненный в dask, так как у меня есть другой конвейер (который не может быть подключен напрямую к этому конвейеру), использующий массивы dask, и для чтения массива dask из памяти требуется файл info (если есть какой-либо другой метод для дампа массив и создать info файл, который я открываю, чтобы попробовать его).

Любые предложения о том, как ускорить и решить эту задачу?

Ответы [ 2 ]

1 голос
/ 23 февраля 2020

Если вы работаете на одной машине, я рекомендую использовать стандартный планировщик с резьбой, а не dask.distributed.Client. Таким образом вы сохраните все данные в одном и том же процессе и избавите от необходимости делать копии большого массива Numpy.

1 голос
/ 20 февраля 2020

Создание всех ваших данных в главном процессе, а затем загрузка их в рабочие процессы - плохая идея! Вы всегда должны стараться загружать / создавать данные непосредственно в рабочих, что будет: а) избегать повторения работы и копирования данных и б) сохранять данные ленивыми, только материализуя их в памяти при необходимости:

В этом случае это может выглядеть как

arr = da.random.randint(0, 2, size=4000000*8000, chunks=100000).reshape((4000000,8000))
...