Лучший способ распараллелить вычисления над блоками dask, которые не возвращают массив np? - PullRequest
1 голос
/ 23 марта 2020

Я хотел бы вернуть кадр данных dask из перекрывающегося вычисления массива dask, где каждый блок вычислений возвращает кадр данных pandas. В приведенном ниже примере показан один из способов сделать это, упрощенный для демонстрационных целей. Я нашел комбинацию da.overlap.overlap и to_delayed().ravel() как способную выполнить работу, если я передам соответствующий ключ блока и информацию о чанке.

Редактировать: Спасибо @AnnaM, который обнаружил ошибки в оригинальном сообщении, а затем сделал его общим! Основываясь на ее комментариях, я включаю обновленную версию кода. Кроме того, отвечая на интерес Анны к использованию памяти, я убедился, что это, кажется, не занимает больше памяти, чем наивно ожидалось.

def extract_features_generalized(chunk, offsets, depth, columns):
    shape = np.asarray(chunk.shape)
    offsets = np.asarray(offsets)
    depth = np.asarray(depth)
    coordinates = np.stack(np.nonzero(chunk)).T     
    keep = ((coordinates >= depth) & (coordinates < (shape - depth))).all(axis=1)    
    data = coordinates + offsets - depth
    df = pd.DataFrame(data=data, columns=columns)
    return df[keep]

def my_overlap_generalized(data, chunksize, depth, columns, boundary):
    data = data.rechunk(chunksize)
    data_overlapping_chunks = da.overlap.overlap(data, depth=depth, boundary=boundary)

    dfs = []
    for block in data_overlapping_chunks.to_delayed().ravel():
        offsets = np.array(block.key[1:]) * np.array(data.chunksize)
        df_block = dask.delayed(extract_features_generalized)(block, offsets=offsets, 
                                                              depth=depth, columns=columns)
        dfs.append(df_block)

    return dd.from_delayed(dfs)

data = np.zeros((2,4,8,16,16))  
data[0,0,4,2,2] = 1
data[0,1,4,6,2] = 1
data[1,2,4,8,2] = 1
data[0,3,4,2,2] = 1

arr = da.from_array(data)
df = my_overlap_generalized(arr, 
                            chunksize=(-1,-1,-1,8,8), 
                            depth=(0,0,0,2,2), 
                            columns=['r', 'c', 'z', 'y', 'x'],
                            boundary=tuple(['reflect']*5))
df.compute().reset_index()

- Остаток оригинального сообщения, включая оригинальные ошибки -

Мой пример только перекрывает xy, но его легко обобщить. Есть ли что-нибудь ниже, что является неоптимальным или может быть сделано лучше? Может ли что-нибудь сломаться, потому что оно полагается на информацию низкого уровня, которая может измениться (например, ключ блока)?

def my_overlap(data, chunk_xy, depth_xy):
    data = data.rechunk((-1,-1,-1, chunk_xy, chunk_xy))
    data_overlapping_chunks = da.overlap.overlap(data, 
                                                 depth=(0,0,0,depth_xy,depth_xy), 
                                                 boundary={3: 'reflect', 4: 'reflect'})

    dfs = []
    for block in data_overlapping_chunks.to_delayed().ravel():
        offsets = np.array(block.key[1:]) * np.array(data.chunksize)
        df_block = dask.delayed(extract_features)(block, offsets=offsets, depth_xy=depth_xy)
        dfs.append(df_block)

    # All computation is delayed, so downstream comptutions need to know the format of the data.  If the meta
    # information is not specified, a single computation will be done (which could be expensive) at this point
    # to infer the metadata.
    # This empty dataframe has the index, column, and type information we expect in the computation.
    columns = ['r', 'c', 'z', 'y', 'x']

    # The dtypes are float64, except for a small number of columns
    df_meta = pd.DataFrame(columns=columns, dtype=np.float64)
    df_meta = df_meta.astype({'c': np.int64, 'r': np.int64})
    df_meta.index.name = 'feature'

    return dd.from_delayed(dfs, meta=df_meta)

def extract_features(chunk, offsets, depth_xy):
    r, c, z, y, x = np.nonzero(chunk) 
    df = pd.DataFrame({'r': r, 'c': c, 'z': z, 'y': y+offsets[3]-depth_xy, 
                       'x': x+offsets[4]-depth_xy})
    df = df[(df.y > depth_xy) & (df.y < (chunk.shape[3] - depth_xy)) &
            (df.z > depth_xy) & (df.z < (chunk.shape[4] - depth_xy))]
    return df

data = np.zeros((2,4,8,16,16))  # round, channel, z, y, x
data[0,0,4,2,2] = 1
data[0,1,4,6,2] = 1
data[1,2,4,8,2] = 1
data[0,3,4,2,2] = 1
arr = da.from_array(data)
df = my_overlap(arr, chunk_xy=8, depth_xy=2)
df.compute().reset_index()

1 Ответ

1 голос
/ 26 марта 2020

Прежде всего, спасибо за размещение вашего кода. Я работаю над аналогичной проблемой, и это действительно помогло мне.

При тестировании вашего кода я обнаружил несколько ошибок в функции extract_features, которые не позволяют вашему коду возвращать правильные индексы.

Вот исправленная версия:

def extract_features(chunk, offsets, depth_xy):
    r, c, z, y, x = np.nonzero(chunk) 
    df = pd.DataFrame({'r': r, 'c': c, 'z': z, 'y': y, 'x': x})
    df = df[(df.y >= depth_xy) & (df.y < (chunk.shape[3] - depth_xy)) &
            (df.x >= depth_xy) & (df.x < (chunk.shape[4] - depth_xy))]
    df['y'] = df['y'] + offsets[3] - depth_xy
    df['x'] = df['x'] + offsets[4] - depth_xy
    return df

Обновленный код теперь возвращает индексы, которые были установлены в 1:

   index  r  c  z  y  x
0      0  0  0  4  2  2
1      1  0  1  4  6  2
2      2  0  3  4  2  2
3      1  1  2  4  8  2

Для сравнения это вывод оригинальной версии:

   index  r  c  z  y  x
0      1  0  1  4  6  2
1      3  1  2  4  8  2
2      0  0  1  4  6  2
3      1  1  2  4  8  2

Возвращает строки номер 2 и 4, по два раза в каждой.

Причина, по которой это происходит, - три ошибки в функции extract_features:

  1. Сначала вы добавляете смещение и вычитаете глубину, а затем отфильтровывает перекрывающиеся детали: порядок необходимо поменять местами
  2. df.y > depth_xy следует заменить на df.y >= depth_xy
  3. df.z следует заменить на df.x, так как это измерение x имеет перекрытие

Чтобы еще больше оптимизировать это, вот обобщенная версия кода, которая будет работать для произвольное число измерений:

def extract_features_generalized(chunk, offsets, depth, columns):
    coordinates = np.nonzero(chunk) 
    df = pd.DataFrame()
    rows_to_keep = np.ones(len(coordinates[0]), dtype=int)
    for i in range(len(columns)):
        df[columns[i]] = coordinates[i]
        rows_to_keep = rows_to_keep * np.array((df[columns[i]] >= depth[i])) * \
                     np.array((df[columns[i]] < (chunk.shape[i] - depth[i])))
        df[columns[i]] = df[columns[i]] + offsets[i] - depth[i]
    del coordinates
    return df[rows_to_keep > 0]

def my_overlap_generalized(data, chunksize, depth, columns):
    data = data.rechunk(chunksize)
    data_overlapping_chunks = da.overlap.overlap(data, depth=depth, 
                                                 boundary=tuple(['reflect']*len(columns)))

    dfs = []
    for block in data_overlapping_chunks.to_delayed().ravel():
        offsets = np.array(block.key[1:]) * np.array(data.chunksize)
        df_block = dask.delayed(extract_features_generalized)(block, offsets=offsets, 
                                                              depth=depth, columns=columns)
        dfs.append(df_block)

    return dd.from_delayed(dfs)

data = np.zeros((2,4,8,16,16))  
data[0,0,4,2,2] = 1
data[0,1,4,6,2] = 1
data[1,2,4,8,2] = 1
data[0,3,4,2,2] = 1

arr = da.from_array(data)
df = my_overlap_generalized(arr, chunksize=(-1,-1,-1,8,8), 
                            depth=(0,0,0,2,2), columns=['r', 'c', 'z', 'y', 'x'])
df.compute().reset_index()
...