Сводная таблица Dask дает AttributeError: объект 'bool' не имеет атрибута 'any' - PullRequest
0 голосов
/ 10 октября 2019

Я получаю AttributeError: объект 'bool' не имеет атрибута 'any' после использования dd.pivot_table. Файл данных более 20 Гб, следовательно, с использованием dask. Я пробовал другие методы для этой операции, но этот, кажется, самый простой, но не может решить эту конкретную проблему.

Мой информационный кадр читается с использованием dask dataframe, считываемым с использованием dask.dataframe.read_csv. После удаления NA и дубликатов, пытаясь повернуть данные.

Dataframe - строки выборки

    userid  touchpoint  rank    source
0   10015072.0  first   1   organic_Mobile_landing
1   10015072.0  Ads     2   organic_Mobile_landing
2   10055982.0  first   1   mobvista_int
3   10055982.0  Ads     2   mobvista_int
4   10240954.0  first   1   Facebook Ads

Операция

import pandas as pd
import dask.dataframe as dd
df=dd.read_csv("data.csv",dtype={'userid': 'float64','rank':'str'})
df=df.dropna()
df=df.drop_duplicates(keep='first')
df = df.categorize("rank")
p=df.pivot_table(index="userid", columns="rank",values="touchpoint")
p.compute()

Ошибка

      ---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-11-f015f34583af> in <module>()
----> 1 p.compute()

/home/centos/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
    173         dask.base.compute
    174         """
--> 175         (result,) = compute(self, traverse=False, **kwargs)
    176         return result
    177 

/home/centos/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
    444     keys = [x.__dask_keys__() for x in collections]
    445     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446     results = schedule(dsk, keys, **kwargs)
    447     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    448 

/home/centos/anaconda3/lib/python3.5/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     80         get_id=_thread_get_id,
     81         pack_exception=pack_exception,
---> 82         **kwargs
     83     )
     84 

/home/centos/anaconda3/lib/python3.5/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    489                         _execute_task(task, data)  # Re-execute locally
    490                     else:
--> 491                         raise_exception(exc, tb)
    492                 res, worker_id = loads(res_info)
    493                 state["cache"][key] = res

/home/centos/anaconda3/lib/python3.5/site-packages/dask/compatibility.py in reraise(exc, tb)
    128         if exc.__traceback__ is not tb:
    129             raise exc.with_traceback(tb)
--> 130         raise exc
    131 
    132     import pickle as cPickle

/home/centos/anaconda3/lib/python3.5/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    231     try:
    232         task, data = loads(task_info)
--> 233         result = _execute_task(task, data)
    234         id = get_id()
    235         result = dumps((result, id))

/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):

/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in <listcomp>(.0)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):

/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):

/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in <listcomp>(.0)
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]
--> 118         args2 = [_execute_task(a, cache) for a in args]
    119         return func(*args2)
    120     elif not ishashable(arg):

/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    113     """
    114     if isinstance(arg, list):
--> 115         return [_execute_task(a, cache) for a in arg]
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]

/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in <listcomp>(.0)
    113     """
    114     if isinstance(arg, list):
--> 115         return [_execute_task(a, cache) for a in arg]
    116     elif istask(arg):
    117         func, args = arg[0], arg[1:]

/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

/home/centos/anaconda3/lib/python3.5/site-packages/dask/compatibility.py in apply(func, args, kwargs)
    105     def apply(func, args, kwargs=None):
    106         if kwargs:
--> 107             return func(*args, **kwargs)
    108         else:
    109             return func(*args)

/home/centos/anaconda3/lib/python3.5/site-packages/dask/dataframe/methods.py in pivot_count(df, index, columns, values)
    316     # make dtype deterministic, always coerce to np.float64
    317     return pd.pivot_table(
--> 318         df, index=index, columns=columns, values=values, aggfunc="count"
    319     ).astype(np.float64)
    320 

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/reshape/pivot.py in pivot_table(data, values, index, columns, aggfunc, fill_value, margins, dropna, margins_name, observed)
     94 
     95     grouped = data.groupby(keys, observed=observed)
---> 96     agged = grouped.agg(aggfunc)
     97     if dropna and isinstance(agged, ABCDataFrame) and len(agged.columns):
     98         agged = agged.dropna(how="all")

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/groupby/generic.py in aggregate(self, arg, *args, **kwargs)
   1453     @Appender(_shared_docs["aggregate"])
   1454     def aggregate(self, arg=None, *args, **kwargs):
-> 1455         return super().aggregate(arg, *args, **kwargs)
   1456 
   1457     agg = aggregate

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/groupby/generic.py in aggregate(self, func, *args, **kwargs)
    227         func = _maybe_mangle_lambdas(func)
    228 
--> 229         result, how = self._aggregate(func, _level=_level, *args, **kwargs)
    230         if how is None:
    231             return result

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/base.py in _aggregate(self, arg, *args, **kwargs)
    354 
    355         if isinstance(arg, str):
--> 356             return self._try_aggregate_string_function(arg, *args, **kwargs), None
    357 
    358         if isinstance(arg, dict):

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/base.py in _try_aggregate_string_function(self, arg, *args, **kwargs)
    303         if f is not None:
    304             if callable(f):
--> 305                 return f(*args, **kwargs)
    306 
    307             # people may try to aggregate on a non-callable attribute

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/groupby/generic.py in count(self)
   1603         blk = map(make_block, map(counter, val), loc)
   1604 
-> 1605         return self._wrap_agged_blocks(data.items, list(blk))
   1606 
   1607     def nunique(self, dropna=True):

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/groupby/generic.py in _wrap_agged_blocks(self, items, blocks)
   1562             result = result.T
   1563 
-> 1564         return self._reindex_output(result)._convert(datetime=True)
   1565 
   1566     def _iterate_column_groupbys(self):

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/groupby/groupby.py in _reindex_output(self, output)
   2469         levels_list = [ping.group_index for ping in groupings]
   2470         index, _ = MultiIndex.from_product(
-> 2471             levels_list, names=self.grouper.names
   2472         ).sortlevel()
   2473 

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/indexes/multi.py in sortlevel(self, level, ascending, sort_remaining)
   2361                 sortorder = level[0]
   2362 
-> 2363             indexer = indexer_from_factorized(primary, primshp, compress=False)
   2364 
   2365             if not ascending:

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/sorting.py in indexer_from_factorized(labels, shape, compress)
    178 
    179 def indexer_from_factorized(labels, shape, compress=True):
--> 180     ids = get_group_index(labels, shape, sort=True, xnull=False)
    181 
    182     if not compress:

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/sorting.py in get_group_index(labels, shape, sort, xnull)
     63     labels = map(ensure_int64, labels)
     64     if not xnull:
---> 65         labels, shape = map(list, zip(*map(maybe_lift, labels, shape)))
     66 
     67     labels = list(labels)

/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/sorting.py in maybe_lift(lab, size)
     59         # promote nan values (assigned -1 label in lab array)
     60         # so that all output values are non-negative
---> 61         return (lab + 1, size + 1) if (lab == -1).any() else (lab, size)
     62 
     63     labels = map(ensure_int64, labels)

AttributeError: 'bool' object has no attribute 'any'
...