Я использую spark 2.4 и python 3.7
# build spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("cos_sim") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
. Преобразование pandas df в искру df
# Pandas to Spark
df = spark_session.createDataFrame(pand_df)
Я сгенерировал некоторые случайные данные, вместо
import random
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id
def generate_random_data(num_usrs = 20, num_cols = 10):
cols = [str(i)+"_x" for i in range(num_cols)]
usrsdata = [ [random.random() for i in range(num_cols)] for i in range(num_usrs)]
# return pd.DataFrame(usrsdata, columns = cols)
return spark.createDataFrame(data = usrsdata, schema = cols)
df = generate_random_data()
df = df.withColumn("uid", monotonically_increasing_id())
df.limit(5).toPandas() # just for nice display of df (df not actually changed)
Преобразование столбцов df в вектор признаков
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=df.columns, outputCol="features")
assembled = assembler.transform(df).select(['uid', 'features'])
assembled.limit(2).toPandas()
Нормализация
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="features", outputCol="norm")
data = normalizer.transform(assembled)
data.limit(2).toPandas()
Рассчитать попарно косинусные сходства
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
mat = IndexedRowMatrix(data.select("uid", "norm").rdd\
.map(lambda row: IndexedRow(row.uid, row.norm.toArray()))).toBlockMatrix()
dot = mat.multiply(mat.transpose())
dot.toLocalMatrix().toArray()[:2] # displaying first 2 users only
Ссылки: Расчет косинусного сходства между всеми строками информационного кадра в pyspark