Я получаю AttributeError: объект 'bool' не имеет атрибута 'any' после использования dd.pivot_table. Файл данных более 20 Гб, следовательно, с использованием dask. Я пробовал другие методы для этой операции, но этот, кажется, самый простой, но не может решить эту конкретную проблему.
Мой информационный кадр читается с использованием dask dataframe, считываемым с использованием dask.dataframe.read_csv. После удаления NA и дубликатов, пытаясь повернуть данные.
Dataframe - строки выборки
userid touchpoint rank source
0 10015072.0 first 1 organic_Mobile_landing
1 10015072.0 Ads 2 organic_Mobile_landing
2 10055982.0 first 1 mobvista_int
3 10055982.0 Ads 2 mobvista_int
4 10240954.0 first 1 Facebook Ads
Операция
import pandas as pd
import dask.dataframe as dd
df=dd.read_csv("data.csv",dtype={'userid': 'float64','rank':'str'})
df=df.dropna()
df=df.drop_duplicates(keep='first')
df = df.categorize("rank")
p=df.pivot_table(index="userid", columns="rank",values="touchpoint")
p.compute()
Ошибка
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-11-f015f34583af> in <module>()
----> 1 p.compute()
/home/centos/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs)
173 dask.base.compute
174 """
--> 175 (result,) = compute(self, traverse=False, **kwargs)
176 return result
177
/home/centos/anaconda3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
444 keys = [x.__dask_keys__() for x in collections]
445 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446 results = schedule(dsk, keys, **kwargs)
447 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
448
/home/centos/anaconda3/lib/python3.5/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
80 get_id=_thread_get_id,
81 pack_exception=pack_exception,
---> 82 **kwargs
83 )
84
/home/centos/anaconda3/lib/python3.5/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
489 _execute_task(task, data) # Re-execute locally
490 else:
--> 491 raise_exception(exc, tb)
492 res, worker_id = loads(res_info)
493 state["cache"][key] = res
/home/centos/anaconda3/lib/python3.5/site-packages/dask/compatibility.py in reraise(exc, tb)
128 if exc.__traceback__ is not tb:
129 raise exc.with_traceback(tb)
--> 130 raise exc
131
132 import pickle as cPickle
/home/centos/anaconda3/lib/python3.5/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
231 try:
232 task, data = loads(task_info)
--> 233 result = _execute_task(task, data)
234 id = get_id()
235 result = dumps((result, id))
/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in <listcomp>(.0)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in <listcomp>(.0)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
113 """
114 if isinstance(arg, list):
--> 115 return [_execute_task(a, cache) for a in arg]
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in <listcomp>(.0)
113 """
114 if isinstance(arg, list):
--> 115 return [_execute_task(a, cache) for a in arg]
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
/home/centos/anaconda3/lib/python3.5/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/home/centos/anaconda3/lib/python3.5/site-packages/dask/compatibility.py in apply(func, args, kwargs)
105 def apply(func, args, kwargs=None):
106 if kwargs:
--> 107 return func(*args, **kwargs)
108 else:
109 return func(*args)
/home/centos/anaconda3/lib/python3.5/site-packages/dask/dataframe/methods.py in pivot_count(df, index, columns, values)
316 # make dtype deterministic, always coerce to np.float64
317 return pd.pivot_table(
--> 318 df, index=index, columns=columns, values=values, aggfunc="count"
319 ).astype(np.float64)
320
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/reshape/pivot.py in pivot_table(data, values, index, columns, aggfunc, fill_value, margins, dropna, margins_name, observed)
94
95 grouped = data.groupby(keys, observed=observed)
---> 96 agged = grouped.agg(aggfunc)
97 if dropna and isinstance(agged, ABCDataFrame) and len(agged.columns):
98 agged = agged.dropna(how="all")
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/groupby/generic.py in aggregate(self, arg, *args, **kwargs)
1453 @Appender(_shared_docs["aggregate"])
1454 def aggregate(self, arg=None, *args, **kwargs):
-> 1455 return super().aggregate(arg, *args, **kwargs)
1456
1457 agg = aggregate
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/groupby/generic.py in aggregate(self, func, *args, **kwargs)
227 func = _maybe_mangle_lambdas(func)
228
--> 229 result, how = self._aggregate(func, _level=_level, *args, **kwargs)
230 if how is None:
231 return result
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/base.py in _aggregate(self, arg, *args, **kwargs)
354
355 if isinstance(arg, str):
--> 356 return self._try_aggregate_string_function(arg, *args, **kwargs), None
357
358 if isinstance(arg, dict):
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/base.py in _try_aggregate_string_function(self, arg, *args, **kwargs)
303 if f is not None:
304 if callable(f):
--> 305 return f(*args, **kwargs)
306
307 # people may try to aggregate on a non-callable attribute
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/groupby/generic.py in count(self)
1603 blk = map(make_block, map(counter, val), loc)
1604
-> 1605 return self._wrap_agged_blocks(data.items, list(blk))
1606
1607 def nunique(self, dropna=True):
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/groupby/generic.py in _wrap_agged_blocks(self, items, blocks)
1562 result = result.T
1563
-> 1564 return self._reindex_output(result)._convert(datetime=True)
1565
1566 def _iterate_column_groupbys(self):
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/groupby/groupby.py in _reindex_output(self, output)
2469 levels_list = [ping.group_index for ping in groupings]
2470 index, _ = MultiIndex.from_product(
-> 2471 levels_list, names=self.grouper.names
2472 ).sortlevel()
2473
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/indexes/multi.py in sortlevel(self, level, ascending, sort_remaining)
2361 sortorder = level[0]
2362
-> 2363 indexer = indexer_from_factorized(primary, primshp, compress=False)
2364
2365 if not ascending:
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/sorting.py in indexer_from_factorized(labels, shape, compress)
178
179 def indexer_from_factorized(labels, shape, compress=True):
--> 180 ids = get_group_index(labels, shape, sort=True, xnull=False)
181
182 if not compress:
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/sorting.py in get_group_index(labels, shape, sort, xnull)
63 labels = map(ensure_int64, labels)
64 if not xnull:
---> 65 labels, shape = map(list, zip(*map(maybe_lift, labels, shape)))
66
67 labels = list(labels)
/home/centos/anaconda3/lib/python3.5/site-packages/pandas/core/sorting.py in maybe_lift(lab, size)
59 # promote nan values (assigned -1 label in lab array)
60 # so that all output values are non-negative
---> 61 return (lab + 1, size + 1) if (lab == -1).any() else (lab, size)
62
63 labels = map(ensure_int64, labels)
AttributeError: 'bool' object has no attribute 'any'