В Dask можно ли преобразовать тензоры в двумерные матрицы в Dask без предварительного вычисления размера? - PullRequest
0 голосов
/ 10 ноября 2018

Пытаясь создать базовый класс Python, способный векторизовать скалярные функции в Dask, я столкнулся с проблемой преобразования тензоров в двумерные матрицы. Решение этой проблемы будет способствовать созданию конвейеров sklearn, которые будут взаимозаменяемо работать с типами данных Numpy, Pandas и Dask.

Следующий код работает на Dask 0.18.2 , но не работает на Dask 0.19.4 и 0.20.0 :

import dask
import dask.array
import dask.dataframe
import numpy
import pandas

def and1(x): return numpy.array([x, x+1], dtype=numpy.float32)

expected = numpy.array([[10, 11, 20, 21], 
                        [30, 31, 40, 41]], 
                       dtype=numpy.float32)

df = pandas.DataFrame.from_dict({
  'c1': [10, 30], 'c2': [20, 40]
})

ddf = dask.dataframe.from_pandas(df, npartitions=2)

# Dask generalized universal function that outputs 2 values per input value
guf = dask.array.gufunc(
    pyfunc=and1,
    signature='()->(n)',
    output_dtypes=numpy.float32,
    output_sizes={'n': 2},
    vectorize=True,
    allow_rechunk = False
)

da = guf(ddf)
da_reshaped = da.reshape((-1, numpy.prod(da.shape[1:])))
npa = da_reshaped.compute()

assert da.shape == (2, 2, 2)  # (input rows, input cols, outputs per cols)
numpy.testing.assert_array_equal(expected, npa)

В Dask 0.19.4 и 0.20.0 reshape вызывает ValueError, поскольку первый элемент формы da s имеет значение NaN (подробности см. В трассировке стека).

ValueErrorTraceback (most recent call last)
<ipython-input-847-ad2c41e1d88c> in <module>
     24 
     25 da = guf(ddf)
---> 26 da_r = da.reshape((-1, numpy.prod(da.shape[1:])))
     27 npa = da_r.compute()
     28 

/opt/conda/lib/python3.6/site-packages/dask/array/core.py in reshape(self, *shape)
   1398         if len(shape) == 1 and not isinstance(shape[0], Number):
   1399             shape = shape[0]
-> 1400         return reshape(self, shape)
   1401 
   1402     def topk(self, k, axis=-1, split_every=None):

/opt/conda/lib/python3.6/site-packages/dask/array/reshape.py in reshape(x, shape)
    160         if len(shape) == 1 and x.ndim == 1:
    161             return x
--> 162         missing_size = sanitize_index(x.size / reduce(mul, known_sizes, 1))
    163         shape = tuple(missing_size if s == -1 else s for s in shape)
    164 

/opt/conda/lib/python3.6/site-packages/dask/array/slicing.py in sanitize_index(ind)
     58                      _sanitize_index_element(ind.step))
     59     elif isinstance(ind, Number):
---> 60         return _sanitize_index_element(ind)
     61     elif is_dask_collection(ind):
     62         return ind

/opt/conda/lib/python3.6/site-packages/dask/array/slicing.py in _sanitize_index_element(ind)
     20     """Sanitize a one-element index."""
     21     if isinstance(ind, Number):
---> 22         ind2 = int(ind)
     23         if ind2 != ind:
     24             raise IndexError("Bad index.  Must be integer-like: %s" % ind)

ValueError: cannot convert float NaN to integer

Есть ли другой способ изменить Dask Arrays в Dask 0.20.0+ без предварительного вычисления размера? Если это так, происходит ли изменение формы операции с постоянным временем, как это выглядит в Numpy?

Я хочу создать матрицу (shape = (R, C)) так, чтобы первая ось не изменялась, но все последующие оси объединялись в порядке "C" (по умолчанию в Dask и Numpy).

(Кстати, я уже видел: Изменение формы массива dask (получено из столбца данных dask) )

...