«ValueError: При использовании всех скалярных значений необходимо передать индекс» при использовании Dask DataFrame.apply () - PullRequest
0 голосов
/ 26 марта 2019

У меня есть некоторые данные, которые я хотел бы расширить с одного столбца до нескольких столбцов с помощью Dask.Я делаю это с помощью метода Dask DataFrame apply и пользовательской функции, которая возвращает новый объект Pandas DataFrame.Однако, когда я пытаюсь это сделать, я получаю ValueError: If using all scalar values, you must pass an index.Вот минимальный случай воспроизведения:

import dask.dataframe as dd
import pandas as pd

pd_df = pd.DataFrame({'a': [1, 2, 3, 4, 5]}, dtype=float)
df = dd.from_pandas(pd_df, npartitions=2)

def custom_fn(row):
    num = row['a']
    frame = pd.DataFrame({
        'squared': [num * num],
        'x2': [num * 2],
    }, dtype=float, index=[0])
    return frame

new_frame = df.apply(custom_fn, axis=1, meta={
    'squared': float,
    'x2': float,
}, result_type='expand')

new_frame.head()

И трассировка стека:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-38-6aaf3a5d32b2> in <module>()
     12 }, result_type='expand')
     13 
---> 14 new_frame.head()

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in head(self, n, npartitions, compute)
    896 
    897         if compute:
--> 898             result = result.compute()
    899         return result
    900 

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, pool, **kwargs)
     74     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     75                         cache=cache, get_id=_thread_get_id,
---> 76                         pack_exception=pack_exception, **kwargs)
     77 
     78     # Cleanup pools associated to dead threads

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    458                         _execute_task(task, data)  # Re-execute locally
    459                     else:
--> 460                         raise_exception(exc, tb)
    461                 res, worker_id = loads(res_info)
    462                 state['cache'][key] = res

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/local.pyc in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    228     try:
    229         task, data = loads(task_info)
--> 230         result = _execute_task(task, data)
    231         id = get_id()
    232         result = dumps((result, id))

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/optimization.pyc in __call__(self, *args)
    940                              % (len(self.inkeys), len(args)))
    941         return core.get(self.dsk, self.outkey,
--> 942                         dict(zip(self.inkeys, args)))
    943 
    944     def __reduce__(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in get(dsk, out, cache)
    147     for key in toposort(dsk):
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result
    151     result = _execute_task(out, cache)

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/core.pyc in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/dataframe/core.pyc in apply_and_enforce(*args, **kwargs)
   3792     func = kwargs.pop('_func')
   3793     meta = kwargs.pop('_meta')
-> 3794     df = func(*args, **kwargs)
   3795     if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   3796         if not len(df):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/dask/utils.pyc in __call__(self, obj, *args, **kwargs)
    714 
    715     def __call__(self, obj, *args, **kwargs):
--> 716         return getattr(obj, self.method)(*args, **kwargs)
    717 
    718     def __reduce__(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in apply(self, func, axis, broadcast, raw, reduce, result_type, args, **kwds)
   6485                          args=args,
   6486                          kwds=kwds)
-> 6487         return op.get_result()
   6488 
   6489     def applymap(self, func):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in get_result(self)
    149             return self.apply_raw()
    150 
--> 151         return self.apply_standard()
    152 
    153     def apply_empty_result(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in apply_standard(self)
    258 
    259         # wrap results
--> 260         return self.wrap_results()
    261 
    262     def apply_series_generator(self):

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results(self)
    306         if len(results) > 0 and is_sequence(results[0]):
    307 
--> 308             return self.wrap_results_for_axis()
    309 
    310         # dict of scalars

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in wrap_results_for_axis(self)
    382         # we have requested to expand
    383         if self.result_type == 'expand':
--> 384             result = self.infer_to_same_shape()
    385 
    386         # we have a non-series and don't want inference

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/apply.pyc in infer_to_same_shape(self)
    400         results = self.results
    401 
--> 402         result = self.obj._constructor(data=results)
    403         result = result.T
    404 

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/frame.pyc in __init__(self, data, index, columns, dtype, copy)
    390                                  dtype=dtype, copy=copy)
    391         elif isinstance(data, dict):
--> 392             mgr = init_dict(data, index, columns, dtype=dtype)
    393         elif isinstance(data, ma.MaskedArray):
    394             import numpy.ma.mrecords as mrecords

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in init_dict(data, index, columns, dtype)
    210         arrays = [data[k] for k in keys]
    211 
--> 212     return arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
    213 
    214 

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in arrays_to_mgr(arrays, arr_names, index, columns, dtype)
     49     # figure out the index, if necessary
     50     if index is None:
---> 51         index = extract_index(arrays)
     52     else:
     53         index = ensure_index(index)

/nail/home/shawn/pg/research_ipython/virtualenv_run/local/lib/python2.7/site-packages/pandas/core/internals/construction.pyc in extract_index(data)
    306 
    307         if not indexes and not raw_lengths:
--> 308             raise ValueError('If using all scalar values, you must pass'
    309                              ' an index')
    310 

ValueError: If using all scalar values, you must pass an index

Я могу опустить result_type='expand' и meta kwarg, чтобы получить DataFrame, полный DataFrames, которые я возвратилв методе, но я хочу, чтобы он расширял встроенный.Я использую Dask 1.1.4 и Pandas 0.24.1 на Python 2.7.6.

РЕДАКТИРОВАТЬ: я обнаружил, что я могу расширить строки позже так:

new_frame = df.apply(custom_fn, axis=1)

dd.concat([
    data for _, data in new_frame.iteritems()
], interleave_partitions=True).head()

Это немного грязно, но, похоже, пока работает, по крайней мере.

...