Как нормализовать массив памяти, превышающий объем памяти? - PullRequest
1 голос
/ 30 апреля 2019

Я пытаюсь нормализовать массив dask с уменьшением самого себя (например, b = a / a.sum() с a и b, являющимися массивами dask).Вычисление этого нормализованного массива сначала вычислит все необходимое, чтобы узнать исходный массив, и только затем вычислит деления и, следовательно, попадет на диск, если памяти недостаточно.

Пример кода:

from dask.distributed import Client
from dask import arry as da

# Create 1000 MB array full of 1's of with chunks of 50MB
a = da.ones(shape=(1/8 * 1000e6, 1), chunks=(1/8 * 50e6, 1))

# Create normalized array with sum = 1
b = a / a.sum()

# Create cluster to small for all of a or b at once
client = Client(n_workers=1, threads_per_worker=1, memory_limit=500e6)

# Compute sum of b  (Spills to disk)
print(b.sum().compute())

Есть ли что-то вроде следующего?

b = a / same_as_a_but_different_tasks.sum()

1 Ответ

0 голосов
/ 02 мая 2019

Я решил это, скопировав массив и переименовав все задачи в верхнем слое:

from copy import deepcopy


def copy_with_renamed_top_layer(a, prepend_name="copy-of-"):
    # copy array and dask
    b = a.copy()
    b.dask = deepcopy(b.dask)

    # get new name
    orig_name = a.name
    new_name = prepend_name + orig_name

    # rename dependencies
    b.dask.dependencies[new_name] = b.dask.dependencies.pop(orig_name)

    # rename tasks of uppermost layer
    b.dask.layers[new_name] = b.dask.layers.pop(orig_name)
    b.dask.layers[new_name] = {
        (new_name, ) + k[1:]: v
        for k, v in b.dask.layers[new_name].items()
    }

    # rename array
    b.name = new_name

    return b


# Create 1000 MB array full of 1's of with chunks of 50MB
a = da.ones(shape=(1/8 * 1000e6, 1), chunks=(1/8 * 50e6, 1))

# copy and rename uppermost layer
a_copy = copy_with_renamed_top_layer(a)

# Create normalized array with sum = 1
b = a / a_copy.sum()

Это, однако, очень хрупкое решение, поскольку оно опирается на текущий внутренний API.

...