Я пытаюсь выполнить несколько сложную операцию над кадром данных, включающую группировку и операции с массивными массивами. Я могу заставить операцию работать в Pandas, но эквивалентный код в Dask дает мне ошибку, которую я не понял, как обойти.
У меня есть два вида элементов в моем фрейме данных: формы ицвета. Каждый элемент связан с вектором, представленным в виде пустого массива.
from pandas import DataFrame
from numpy import array
from scipy.spatial.distance import cosine
from numpy import mean as array_mean
p = DataFrame({
"type": ["shape", "shape", "color", "color", "color"],
"vector": [array([1.0, 1.1]),
array([0.8, 0.9]),
array([0.6, 0.8]),
array([1.1, 1.2]),
array([0.7, 0.9])
]
})
type vector
0 shape [1.0, 1.1]
1 shape [0.8, 0.9]
2 color [0.6, 0.8]
3 color [1.1, 1.2]
4 color [0.7, 0.9]
Я хочу сделать следующее:
- Группировать элементы по типу.
- Возьмите средний вектор для каждой группы.
- Для каждого элемента вычислите косинусное расстояние его вектора до среднего значения его группы и отсортируйте в пределах группы по этому расстоянию.
Следующая функциявыполняет это с помощью панд.
def pandas_ordering(f):
means = f.groupby("type")["vector"].apply(array_mean).to_frame().rename(columns={"vector":"mean"})
f = f.merge(means, left_on="type", right_index=True)
f["cosine distance"] = f.apply(lambda row:cosine(row["vector"], row["mean"]), axis=1)
return f.groupby("type", group_keys=False).apply(lambda x:x.sort_values("cosine distance"))
pandas_ordering(p)
type vector mean cosine distance
4 color [0.7, 0.9] [0.8000000000000002, 0.9666666666666667] 0.000459
2 color [0.6, 0.8] [0.8000000000000002, 0.9666666666666667] 0.001144
3 color [1.1, 1.2] [0.8000000000000002, 0.9666666666666667] 0.001280
0 shape [1.0, 1.1] [0.9, 1.0] 0.000012
1 shape [0.8, 0.9] [0.9, 1.0] 0.000019
Я переписал функцию в Dask. Логика идентична, а код почти идентичен, за исключением пары meta
украшений.
import dask.dataframe as dd
f = dd.from_pandas(p, npartitions=1)
def dask_ordering(f):
means = f.groupby("type")["vector"].apply(array_mean, meta="object").to_frame().rename(columns={0:"mean"})
f = f.merge(means, left_on="type", right_index=True)
f["cosine distance"] = f.apply(lambda row:cosine(row["vector"], row["mean"]), axis=1, meta="object")
f.groupby("type", group_keys=False).apply(lambda x:x.sort_values("cosine distance"), meta="object")
return f
Однако версия Dask выдает ошибку, когда пытается объединить систему средств с оригиналом. кадр векторов.
dask_ordering(f).compute()
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-120-46dd96a5db68> in <module>
----> 1 dask_ordering(f).compute()
<ipython-input-119-49ddda5479b1> in dask_ordering(f)
5 def dask_ordering(f):
6 means = f.groupby("type")["vector"].apply(array_mean, meta="object").to_frame().rename(columns={0:"mean"})
----> 7 f = f.merge(means, left_on="type", right_index=True)
8 f["cosine distance"] = f_1.apply(lambda row:cosine(row["vector"], row["mean"]), axis=1, meta="object")
9 f.groupby("type", group_keys=False).apply(lambda x:x.sort_values("cosine distance"), meta="object")
~/Documents/notebooks/env/lib/python3.6/site-packages/dask/dataframe/core.py in merge(self, right, how, on, left_on, right_on, left_index, right_index, suffixes, indicator, npartitions, shuffle)
3768 npartitions=npartitions,
3769 indicator=indicator,
-> 3770 shuffle=shuffle,
3771 )
3772
~/Documents/notebooks/env/lib/python3.6/site-packages/dask/dataframe/multi.py in merge(left, right, how, on, left_on, right_on, left_index, right_index, suffixes, indicator, npartitions, shuffle, max_branch)
490 right_index=right_index,
491 suffixes=suffixes,
--> 492 indicator=indicator,
493 )
494
~/Documents/notebooks/env/lib/python3.6/site-packages/dask/dataframe/multi.py in single_partition_join(left, right, **kwargs)
321 # new index will not necessarily correspond the current divisions
322
--> 323 meta = left._meta_nonempty.merge(right._meta_nonempty, **kwargs)
324 kwargs["empty_index_dtype"] = meta.index.dtype
325 name = "merge-" + tokenize(left, right, **kwargs)
~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/frame.py in merge(self, right, how, on, left_on, right_on, left_index, right_index, sort, suffixes, copy, indicator, validate)
7347 copy=copy,
7348 indicator=indicator,
-> 7349 validate=validate,
7350 )
7351
~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/reshape/merge.py in merge(left, right, how, on, left_on, right_on, left_index, right_index, sort, suffixes, copy, indicator, validate)
79 copy=copy,
80 indicator=indicator,
---> 81 validate=validate,
82 )
83 return op.get_result()
~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/reshape/merge.py in __init__(self, left, right, how, on, left_on, right_on, axis, left_index, right_index, sort, suffixes, copy, indicator, validate)
628 # validate the merge keys dtypes. We may need to coerce
629 # to avoid incompat dtypes
--> 630 self._maybe_coerce_merge_keys()
631
632 # If argument passed to validate,
~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/reshape/merge.py in _maybe_coerce_merge_keys(self)
1136 inferred_right in string_types and inferred_left not in string_types
1137 ):
-> 1138 raise ValueError(msg)
1139
1140 # datetimelikes must match exactly
ValueError: You are trying to merge on object and int64 columns. If you wish to proceed you should use pd.concat
Предположительно, Даск не понимает тип одного из столбцов, но я не уверен, какой именно. В частности, я не уверен, к чему относится упоминание столбца «int64» в сообщении об ошибке.
Промежуточный фрейм средних значений вектора выглядит одинаково и для Pandas, и для Dask.
p.groupby("type")["vector"].apply(array_mean).to_frame().rename(columns={"vector":"mean"})
и
f.groupby("type")["vector"].apply(array_mean, meta="object").to_frame().rename(columns={0:"mean"}).compute()
оба дают
mean
type
color [0.8000000000000002, 0.9666666666666667]
shape [0.9, 1.0]
Кто-нибудь знает, как заставить это работать в Даске?