У меня есть примерный фрейм данных:
df = pd.DataFrame({'http_user':['user1']*10, 'dst':['1111'] * 10, 'dst_port':[80] * 10, 'content': np.random.randint(0, 1024, size=10)})
ddf = dd.from_pandas(df, npartitions=5)
group = ddf.groupby(['http_user', 'dst', 'dst_port'])
meta_df = make_meta(('average', 'f8'))
meta_df = pd.MultiIndex(levels=[['user'], ['111'], [443]], codes=[[]] * 3, names=['http_user', 'dst', 'dst_port'])
with_apply = group.content.apply(lambda s: s.mean(), meta=meta_df)
without_apply = group.content.mean()
without_apply.to_frame('average').assign(average2=without_apply) # this works
without_apply.to_frame('average').assign(average2=with_apply) # This doesn't
Исключение составляет:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-120-190777069746> in <module>
----> 1 without_apply.to_frame('average').assign(average2=with_apply)
~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/dataframe/core.py in assign(self, **kwargs)
3519 # Figure out columns of the output
3520 df2 =
self._meta_nonempty.assign(**_extract_meta(kwargs, nonempty=True))
-> 3521 return elemwise(methods.assign, self, *pairs, meta=df2)
3522
3523 @derived_from(pd.DataFrame, ua_args=["index"])
~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/dataframe/core.py in elemwise(op, *args, **kwargs)
4273 from .multi import _maybe_align_partitions
4274
-> 4275 args = _maybe_align_partitions(args)
4276 dasks = [arg for arg in args if isinstance(arg, (_Frame, Scalar, Array))]
4277 dfs = [df for df in dasks if isinstance(df, _Frame)]
~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/dataframe/multi.py in _maybe_align_partitions(args)
162 divisions = dfs[0].divisions
163 if not all(df.divisions == divisions for df in dfs):
--> 164 dfs2 = iter(align_partitions(*dfs)[0])
165 return [a if not isinstance(a, _Frame) else next(dfs2) for a in args]
166 return args
~/virtualenvs/statistics3/lib/python3.6/site-packages/dask/dataframe/multi.py in align_partitions(*dfs)
117 if not all(df.known_divisions for df in dfs1):
118 raise ValueError(
--> 119 "Not all divisions are known, can't align "
120 "partitions. Please use `set_index` "
121 "to set the index."
ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.
В моем анализе у меня есть некоторые агрегации, которые можно сделать, создав пользовательский Aggregator
и используя его в группах. Назначение работ. Но некоторые из них нуждаются в передаче дополнительных данных, так что, насколько мне известно, я могу использовать только apply, поскольку я не могу передать данные в функции агрегатора (chunk
, agg
, finalize
). Из документации, хотя использование Aggregator
объект является предпочтительным. Ожидается ли такое поведение применения?