Для массива dask с большим объемом памяти размера = (M, N): Как выполнить повторное разбиение фрагментов с (= 1, N) на фрагменты = (M, 1)? - PullRequest
2 голосов
/ 24 марта 2019

Чтобы, например, применить БИХ-фильтр, закодированный в Numpy / Numba вдоль всей оси, мне нужно повторно разделить массив size=(M, N) dask с chunks=(m0, n0) до chunks=(m1, N) с m1 < m0.

Поскольку Dask избегает повторяющихся задач, во время rechunk-split / rechunk-merge в памяти будут храниться данные стоимостью (m0, N) (x 2?). Есть ли способ оптимизировать график, чтобы избежать такого поведения?

Я знаю, где можно найти информацию об оптимизации графиков Даска вручную. Но есть ли способ либо изменить политику планирования, чтобы позволить повторять задачи, либо (автоматически) перестроить график, чтобы минимизировать использование памяти во время этого повторного блока?

Вот минимальный пример (для крайнего случая, когда chunks=(M, 1)chunks=(1, N)):

from dask import array as da
from dask.distributed import Client

# limit memory to 4 GB
client = Client(memory_limit=4e9)

# Create 80 GB random array with chunks=(M, 1)
arr = da.random.uniform(-1, 1, size=(1e5, 1e5), chunks=(1e5, 1))

# Compute mean (This works!)
arr.mean().compute()

# Rechunk to chunks=(1, N)
arr = arr.rechunk((1, 1e5))

# Compute mean (This hits memory limit!)
arr.mean().compute()

Ответы [ 2 ]

2 голосов
/ 27 марта 2019

К сожалению, вы находитесь в худшем случае, вам нужно вычислить каждый входной блок, прежде чем вы сможете получить один выходной блок.

Операции повторного разбиения Dask вполне приемлемы, и в промежутке они будут перетекать вещи в блоки промежуточного размера, поэтому возможно, что это сработает в заполняемой памяти, но вы определенно будете писать вещина диск.

Короче говоря, в принципе нет ничего, что вы должны делать дополнительно.В теории алгоритмы повторного разбиения Dask должны справиться с этим.Если вы хотите, вы можете поиграть с ключевыми словами threshold= и block_size_limit= для повторного пересчета.

0 голосов
/ 29 марта 2019

Ключевое слово block_size_limit= приводит к некоторому решению.

(Ниже я использую меньший массив, потому что у меня не осталось 80 ГБ диска для разлива.)

from dask import array as da
from dask.distributed import Client

# limit memory to 1 GB
client = Client(n_workers=1, threads_per_worker=1, memory_limit=1e9)

# Create 3.2 GB array
arr = da.random.uniform(-1, 1, size=(2e4, 2e4), chunks=(2e4, 1e1))

# Check graph size
print(len(arr.__dask_graph__()), "nodes in graph")  # 2000 nodes

# Compute
print(arr.mean().compute())  # Takes 11.9 seconds. Doesn't spill.

# re-create array and rechunk with block_size_limit=1e3
arr = da.random.uniform(-1, 1, size=(2e4, 2e4), chunks=(2e4, 1e1))
arr = arr.rechunk((2e1, 2e4), block_size_limit=1e3)

# Check graph size
print(len(arr.__dask_graph__()), "nodes in graph")  # 32539 nodes

# Compute
print(arr.mean().compute())  # Takes 140 seconds, spills ~5GB to disk.

# re-create array and rechunk with default kwargs
arr = da.random.uniform(-1, 1, size=(2e4, 2e4), chunks=(2e4, 1e1))
arr = arr.rechunk((2e1, 2e4))

# Check graph size
print(len(arr.__dask_graph__()), "nodes in graph")  # 9206 nodes

# Compute
print(arr.mean().compute())  # Worker dies at 95% memory use
...