Могу ли я распараллелить `numpy.bincount` с помощью` xarray.apply_ufunc`? - PullRequest
0 голосов
/ 10 апреля 2019

Я хочу распараллелить функцию numpy.bincount, используя apply_ufunc API xarray, и я попробовал следующий код:

import numpy as np
import xarray as xr
da = xr.DataArray(np.random.rand(2,16,32),
                  dims=['time', 'y', 'x'],
                  coords={'time': np.array(['2019-04-18', '2019-04-19'],
                                          dtype='datetime64'), 
                         'y': np.arange(16), 'x': np.arange(32)})

f = xr.DataArray(da.data.reshape((2,512)),dims=['time','idx'])
x = da.x.values
y = da.y.values
r = np.sqrt(x[np.newaxis,:]**2 + y[:,np.newaxis]**2)
nbins = 4
if x.max() > y.max():
    ri = np.linspace(0., y.max(), nbins)
else:
    ri = np.linspace(0., x.max(), nbins)

ridx = np.digitize(np.ravel(r), ri)

func = lambda a, b: np.bincount(a, weights=b)
xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)

, но я получаю следующую ошибку:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-203-974a8f0a89e8> in <module>()
     12 
     13 func = lambda a, b: np.bincount(a, weights=b)
---> 14 xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)

~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_ufunc(func, *args, **kwargs)
    979                                      signature=signature,
    980                                      join=join,
--> 981                                      exclude_dims=exclude_dims)
    982     elif any(isinstance(a, Variable) for a in args):
    983         return variables_ufunc(*args)

~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_dataarray_ufunc(func, *args, **kwargs)
    208 
    209     data_vars = [getattr(a, 'variable', a) for a in args]
--> 210     result_var = func(*data_vars)
    211 
    212     if signature.num_outputs > 1:

~/anaconda/envs/uptodate/lib/python3.6/site-packages/xarray/core/computation.py in apply_variable_ufunc(func, *args, **kwargs)
    558             raise ValueError('unknown setting for dask array handling in '
    559                              'apply_ufunc: {}'.format(dask))
--> 560     result_data = func(*input_data)
    561 
    562     if signature.num_outputs == 1:

<ipython-input-203-974a8f0a89e8> in <lambda>(a, b)
     11 ridx = np.digitize(np.ravel(r), ri)
     12 
---> 13 func = lambda a, b: np.bincount(a, weights=b)
     14 xr.apply_ufunc(func, xr.DataArray(ridx,dims=['idx']), f)

ValueError: object too deep for desired array

Я заблудился там, где возникла ошибка, и помощь будет принята с благодарностью ...

1 Ответ

1 голос
/ 20 апреля 2019

Проблема в том, что apply_along_axis выполняет итерации по 1D-фрагментам первого аргумента для прикладной функции, а не для других. Если я правильно понимаю ваш сценарий использования, вы на самом деле хотите перебрать 1D-фрагменты весов (weights в подписи np.bincount) , , а не целочисленный массив (x в подписи np.bincount.

Одним из способов решения этой проблемы является написание тонкой функции-оболочки вокруг np.bincount, которая просто меняет порядок аргументов:

def wrapped_bincount(weights, x):
    return np.bincount(x, weights=weights)

Затем мы можем использовать np.apply_along_axis с этой функцией для вашего варианта использования:

def apply_bincount_along_axis(x, weights, axis=-1):
    return np.apply_along_axis(wrapped_bincount, axis, weights, x)

Наконец, мы можем обернуть эту новую функцию для использования с xarray, используя apply_ufunc, отметив, что она может автоматически распараллеливаться с dask (также обратите внимание, что нам не нужно предоставлять axis аргумент, потому что xarray автоматически переместит размер входного ядра dim в последнюю позицию в массиве weights перед применением функции):

def xbincount(x, weights):
    if len(x.dims) != 1:
        raise ValueError('x must be one-dimensional')

    dim, = x.dims
    nbins = x.max() + 1

    return xr.apply_ufunc(apply_bincount_along_axis, x, weights, 
        input_core_dims=[[dim], [dim]],
        output_core_dims=[['bin']], dask='parallelized',
        output_dtypes=[np.float], output_sizes={'bin': nbins})

Применение этой функции к вашему примеру выглядит следующим образом:

xbincount(ridx, f)

<xarray.DataArray (time: 2, bin: 5)>
array([[  0.      ,   7.934821,  34.066872,  51.118065, 152.769169],
       [  0.      ,  11.692989,  33.262936,  44.993856, 157.642972]])
Dimensions without coordinates: time, bin

По желанию также работает с массивами dask:

xbincount(ridx, f.chunk({'time': 1}))

<xarray.DataArray (time: 2, bin: 5)>
dask.array<shape=(2, 5), dtype=float64, chunksize=(1, 5)>
Dimensions without coordinates: time, bin
...