CoordinateMatrix.entries не выполняется - PullRequest
0 голосов
/ 07 сентября 2018

Я экспериментирую с различными методами вычисления подобия элемент-элемент для реализации совместной фильтрации.В примитивном наборе данных о взаимодействиях между пользовательскими элементами у меня порядка 3,9 мм пользователей и ровно 1 933 элементов.Среди других измерений сходства я играю с косинусным сходством между каждым элементом, и делаю это как с, так и без центрирования векторов элементов.Выполненный без центрирования среднего значения, рабочий процесс выглядит примерно так:

# setup - query user-item interactions
df = spark.sql("select user, item, r_ui from user_item_interactions")
items = [i.item for i in df.select("item").distinct().collect()

# pivot to create utility matrix (rows=users, columns=items, elements=user-item rating)
utility = df.groupBy('user').pivot("item", items).max("r_ui").fillna(0)

# without mean centering
feature_rdd = utility.rdd.map(lambda row : row[1:])
feature_mat = RowMatrix(feature_rdd)

# use RowMatrix cosine similarity method
cosine = feature_mat.columnSimilarities()

# output is of type CoordinateMatrix, so can access entries
cos_df = cosine.entries.toDF()
# cos_df.doStuff(...)

# with mean centering
assembler = VectorAssembler(inputCols=utility.drop('visitor_id').columns, outputCol='features')
# vectorize and apply scaler
vectorized_utility = assembler.transform(utility)
scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withMean=True, withStd=False)
scaled_utility_inter = scaler.fit(vectorized_utility.transform(vectorized_utility)

# see footnotes for "extract" function --
# just takes vector entries and blows back out to DF columns
scaled_utility = scaled_utiltiy_inter.select("user", "scaled_features").rdd.map(extract).toDF(["user"] + items)

# repeat approach from above
scaled_feature_rdd = scaled_utility.rdd.map(lambda row : row[1:])
scaled_feature_mat = RowMatrix(scaled_feature_rdd)
scaled_cosine = scaled_feature_mat.columnSimilarities()
scaled_cos_df = scaled_cosine.entries.toDF()
# scaled_cos_df.doStuff(...)

Масштабированный DataFrame (теперь со столбцом scaled_features) фактически переделывается, чтобы соответствовать немасштабированной служебной матрице, как описано в * 1005.*https://stackoverflow.com/questions/38384347/how-to-split-vector-into-columns-using-pyspark

В обоих подходах функция columnSimilarities() возвращает симметричное (в данном случае 1933x1933) pyspark.mllib.linalg.distributed.CoordinateMatrix.Доступ к этому базовому СДР CoordinateMatrix (cos_df.entries) в немасштабированном apraoch занимает примерно 20 секунд.Доступ к нему для масштабированного подхода просто создает задачи, которые не выполняются.Я в полном недоумении, почему это так.

РЕДАКТИРОВАТЬ : мой кластер является кластером Azure Databricks с двумя узлами, оба Azure D14 (16 ядер, 112 ГБ ОЗУ).Водитель такой же, как голова.Python 3, Spark 2.3.0.

...