Перевести слияния панд в Dask - PullRequest
0 голосов
/ 29 октября 2019

Я пытаюсь выполнить несколько сложную операцию над кадром данных, включающую группировку и операции с массивными массивами. Я могу заставить операцию работать в Pandas, но эквивалентный код в Dask дает мне ошибку, которую я не понял, как обойти.

У меня есть два вида элементов в моем фрейме данных: формы ицвета. Каждый элемент связан с вектором, представленным в виде пустого массива.

from pandas import DataFrame
from numpy import array
from scipy.spatial.distance import cosine
from numpy import mean as array_mean


p = DataFrame({
    "type": ["shape", "shape", "color", "color", "color"],
    "vector": [array([1.0, 1.1]),
                  array([0.8, 0.9]),
                  array([0.6, 0.8]),
                  array([1.1, 1.2]),
                  array([0.7, 0.9])                  
                 ]
})


    type    vector
0   shape   [1.0, 1.1]
1   shape   [0.8, 0.9]
2   color   [0.6, 0.8]
3   color   [1.1, 1.2]
4   color   [0.7, 0.9]

Я хочу сделать следующее:

  1. Группировать элементы по типу.
  2. Возьмите средний вектор для каждой группы.
  3. Для каждого элемента вычислите косинусное расстояние его вектора до среднего значения его группы и отсортируйте в пределах группы по этому расстоянию.

Следующая функциявыполняет это с помощью панд.

def pandas_ordering(f):
    means = f.groupby("type")["vector"].apply(array_mean).to_frame().rename(columns={"vector":"mean"})
    f = f.merge(means, left_on="type", right_index=True)
    f["cosine distance"] = f.apply(lambda row:cosine(row["vector"], row["mean"]), axis=1)
    return f.groupby("type", group_keys=False).apply(lambda x:x.sort_values("cosine distance"))

pandas_ordering(p)

    type    vector       mean                                       cosine distance
4   color   [0.7, 0.9]  [0.8000000000000002, 0.9666666666666667]    0.000459
2   color   [0.6, 0.8]  [0.8000000000000002, 0.9666666666666667]    0.001144
3   color   [1.1, 1.2]  [0.8000000000000002, 0.9666666666666667]    0.001280
0   shape   [1.0, 1.1]  [0.9, 1.0]                               0.000012
1   shape   [0.8, 0.9]  [0.9, 1.0]                               0.000019

Я переписал функцию в Dask. Логика идентична, а код почти идентичен, за исключением пары meta украшений.

import dask.dataframe as dd

f = dd.from_pandas(p, npartitions=1)

def dask_ordering(f):
    means = f.groupby("type")["vector"].apply(array_mean, meta="object").to_frame().rename(columns={0:"mean"})
    f = f.merge(means, left_on="type", right_index=True)
    f["cosine distance"] = f.apply(lambda row:cosine(row["vector"], row["mean"]), axis=1, meta="object")
    f.groupby("type", group_keys=False).apply(lambda x:x.sort_values("cosine distance"), meta="object")
    return f

Однако версия Dask выдает ошибку, когда пытается объединить систему средств с оригиналом. кадр векторов.

dask_ordering(f).compute()

  ---------------------------------------------------------------------------
  ValueError                                Traceback (most recent call last)
  <ipython-input-120-46dd96a5db68> in <module>
  ----> 1 dask_ordering(f).compute()

  <ipython-input-119-49ddda5479b1> in dask_ordering(f)
        5 def dask_ordering(f):
        6     means = f.groupby("type")["vector"].apply(array_mean, meta="object").to_frame().rename(columns={0:"mean"})
  ----> 7     f = f.merge(means, left_on="type", right_index=True)
        8     f["cosine distance"] = f_1.apply(lambda row:cosine(row["vector"], row["mean"]), axis=1, meta="object")
        9     f.groupby("type", group_keys=False).apply(lambda x:x.sort_values("cosine distance"), meta="object")

  ~/Documents/notebooks/env/lib/python3.6/site-packages/dask/dataframe/core.py in merge(self, right, how, on, left_on, right_on, left_index, right_index, suffixes, indicator, npartitions, shuffle)
     3768             npartitions=npartitions,
     3769             indicator=indicator,
  -> 3770             shuffle=shuffle,
     3771         )
     3772 

  ~/Documents/notebooks/env/lib/python3.6/site-packages/dask/dataframe/multi.py in merge(left, right, how, on, left_on, right_on, left_index, right_index, suffixes, indicator, npartitions, shuffle, max_branch)
      490             right_index=right_index,
      491             suffixes=suffixes,
  --> 492             indicator=indicator,
      493         )
      494 

  ~/Documents/notebooks/env/lib/python3.6/site-packages/dask/dataframe/multi.py in single_partition_join(left, right, **kwargs)
      321     # new index will not necessarily correspond the current divisions
      322 
  --> 323     meta = left._meta_nonempty.merge(right._meta_nonempty, **kwargs)
      324     kwargs["empty_index_dtype"] = meta.index.dtype
      325     name = "merge-" + tokenize(left, right, **kwargs)

  ~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/frame.py in merge(self, right, how, on, left_on, right_on, left_index, right_index, sort, suffixes, copy, indicator, validate)
     7347             copy=copy,
     7348             indicator=indicator,
  -> 7349             validate=validate,
     7350         )
     7351 

  ~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/reshape/merge.py in merge(left, right, how, on, left_on, right_on, left_index, right_index, sort, suffixes, copy, indicator, validate)
       79         copy=copy,
       80         indicator=indicator,
  ---> 81         validate=validate,
       82     )
       83     return op.get_result()

  ~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/reshape/merge.py in __init__(self, left, right, how, on, left_on, right_on, axis, left_index, right_index, sort, suffixes, copy, indicator, validate)
      628         # validate the merge keys dtypes. We may need to coerce
      629         # to avoid incompat dtypes
  --> 630         self._maybe_coerce_merge_keys()
      631 
      632         # If argument passed to validate,

  ~/Documents/notebooks/env/lib/python3.6/site-packages/pandas/core/reshape/merge.py in _maybe_coerce_merge_keys(self)
     1136                     inferred_right in string_types and inferred_left not in string_types
     1137                 ):
  -> 1138                     raise ValueError(msg)
     1139 
     1140             # datetimelikes must match exactly

  ValueError: You are trying to merge on object and int64 columns. If you wish to proceed you should use pd.concat

Предположительно, Даск не понимает тип одного из столбцов, но я не уверен, какой именно. В частности, я не уверен, к чему относится упоминание столбца «int64» в сообщении об ошибке.

Промежуточный фрейм средних значений вектора выглядит одинаково и для Pandas, и для Dask.

p.groupby("type")["vector"].apply(array_mean).to_frame().rename(columns={"vector":"mean"})

и

f.groupby("type")["vector"].apply(array_mean, meta="object").to_frame().rename(columns={0:"mean"}).compute()

оба дают

       mean
type
color  [0.8000000000000002, 0.9666666666666667]
shape  [0.9, 1.0]

Кто-нибудь знает, как заставить это работать в Даске?

1 Ответ

1 голос
/ 30 октября 2019

Проблема заключается в том, что вы создаете means фрейм данных - индексом является int, а не str (вероятно, это ошибка, которую стоит поднять в github) (по какой-то причине - этоможет автоматически преобразовываться в тип categorical или что-то подобное.)

В то же время функция «ниже» - это обходной путь.

def dask_ordering(f):
    means = f.groupby("type")["vector"].apply(array_mean, meta="object").to_frame(name="mean")    
    means['idx'] = means.index
    means['idx'] = means['idx'].astype(str)
    f = f.merge(means, left_on="type", right_on="idx")    
    f["cosine distance"] = f.apply(lambda row:cosine(row["vector"], row["mean"]), axis=1, meta="object")
    f.groupby("type", group_keys=False).apply(lambda x:x.sort_values("cosine distance"), meta="object")
    return f
...