Вы можете рассчитать косинусное сходство только для двух векторов, а не для двух чисел.При этом, если столбцы с именем CustomerValue являются различными компонентами вектора, представляющего функцию, для которой вы хотите получить сходство между двумя клиентами, вы можете сделать это путем транспонирования фрейма данных, а затем выполнить объединение с CuatomerValues.
Транспонирование может быть выполнено с разнесением (более подробно о транспонировании фрейма данных здесь ):
from pyspark.sql import functions as F
kvs = F.explode(F.array([
F.struct(F.lit(c).alias('key'), F.columm(c).alias('value')) for c in ['CustomerValue1', 'CustomerValue2']
])).alias('kvs')
dft1 = (df1.select(['CustomerID', kvs])
.select('CustomerID', F.column('kvs.name').alias('column_name'), F.column('kvs.value').alias('column_value'))
)
dft2 = (df2.select(['CustomerID', kvs])
.select('CustomerID', F.column('kvs.name').alias('column_name'), F.column('kvs.value').alias('column_value'))
)
, где dft1
и dft2
обозначают транспонированные данныекадры.После того, как вы их транспонировали, вы можете объединить их по именам столбцов:
dft2 = (dft2.withColumnRenamed('CustomerID', 'CustomerID2')
.withColumnRenamed('column_value', 'column_value2')
)
cosine = (dft1.join(dft2, dft1.column_name = dft2.column_name)
.groupBy('CustomerID' , 'CustomerID2')
.agg(F.sum(F.column('column_value')*F.column('column_value2')).alias('cosine_similarity'))
)
Теперь в cosine
у вас есть три столбца: CustomerID из первого и второго фреймов данных и косинусное сходство (при условии, чтозначения были нормализованы в первую очередь).Преимуществом этого является то, что у вас есть только строки для пар CustomerID, которые имеют ненулевое сходство (в случае нулевых значений для некоторых CustomerID).Для вашего примера:
df1:
CustomerID CustomerValue CustomerValue2
12 .17 .08
df2:
CustomerID CustomerValue CustomerValue
15 .17 .14
16 .40 .43
18 .86 .09
косинус:
CustomID CustomID2 cosine_similarity
12 15 .0401
12 16 .1024
12 18 .1534
Конечно, это не настоящиесходства косинусов пока нет, сначала нужно нормализовать значения.Вы можете сделать это с группой следующим образом:
(df.groupBy('CustomerID')
.agg(F.sqrt(F.sum(F.column('column_value')*F.column('column_value'))).alias('norm'))
.select('CustomerID', F.column('column_name'), (F.column('column_value')/F.column('norm')).alias('column_value_norm'))
)
После нормализации столбцов ваши косинусные сходства станут следующими:
CustomID CustomID2 cosine_similarity
12 15 .970
12 16 .928
12 18 .945
Большие значения подобия обусловлены низкой размерностью (дватолько компоненты).