Мы используем метод ALS (Alternating Least Squares) в нашей облачной искровой среде Google, чтобы рекомендовать некоторые компании нашим пользователям.Для рекомендации мы используем этот кортеж (userId, companyId, rating), а значение рейтинга состоит из комбинации интересов пользователя, таких как нажатие на страницу компании, добавление компании в список избранных, выполнение заказа от компании и т. Д. (Наш методочень похож на эту ссылку: https://cloud.google.com/solutions/recommendations-using-machine-learning-on-compute-engine#Training-the-models)
И результаты довольно хорошие и работают для нашего бизнес-кейса, однако нам не хватает одной важной для нас вещи. Нам нужно узнать, какие пользователи группируются какСхожие интересы (или соседи). Знаете ли вы, есть ли способ получить группированных пользователей из алгоритма ALS pyspark? Таким образом, мы сможем пометить пользователей в соответствии с этой группировкой
Редактировать:
Я попробовал код ответа, приведенный ниже, но результаты странные, мои данные спарены следующим образом (userId, companyId, rating). Когда я запускаю приведенный ниже код, он группирует пользователей без общего companyId втот же clusterId Например, один из результатов приведенного ниже кода: (userId: 471, clusterId: 2) (используйтекод: 490, идентификатор кластера: 2)
Однако пользователи 471 и 490 не имеют ничего общего.Я думаю, что здесь есть ошибка
from __future__ import print_function
import sys
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import IntegerType
from pyspark.mllib.clustering import KMeans, KMeansModel
conf = SparkConf().setAppName("user_clustering")
sc = SparkContext(conf=conf)
sc.setCheckpointDir('checkpoint/')
sqlContext = SQLContext(sc)
CLOUDSQL_INSTANCE_IP = sys.argv[1]
CLOUDSQL_DB_NAME = sys.argv[2]
CLOUDSQL_USER = sys.argv[3]
CLOUDSQL_PWD = sys.argv[4]
BEST_RANK = int(sys.argv[5])
BEST_ITERATION = int(sys.argv[6])
BEST_REGULATION = float(sys.argv[7])
TABLE_ITEMS = "companies"
TABLE_RATINGS = "ml_ratings"
TABLE_RECOMMENDATIONS = "ml_reco"
TABLE_USER_CLUSTERS = "ml_user_clusters"
# Read the data from the Cloud SQL
# Create dataframes
#[START read_from_sql]
jdbcUrl = 'jdbc:mysql://%s:3306/%s?user=%s&password=%s' % (CLOUDSQL_INSTANCE_IP, CLOUDSQL_DB_NAME, CLOUDSQL_USER, CLOUDSQL_PWD)
dfAccos = sqlContext.read.jdbc(url=jdbcUrl, table=TABLE_ITEMS)
dfRates = sqlContext.read.jdbc(url=jdbcUrl, table=TABLE_RATINGS)
print("Start Clustering Users")
# print("User Ratings:")
# dfRates.show(100)
#[END read_from_sql]
# Get all the ratings rows of our user
# print("Filtered User Ratings For User:",USER_ID)
# print("------------------------------")
# for x in dfUserRatings:
# print(x)
#[START split_sets]
rddTraining, rddValidating, rddTesting = dfRates.rdd.randomSplit([6,2,2])
print("RDDTraining Size:",rddTraining.count()," RDDValidating Size:",rddValidating.count()," RDDTesting Size:",rddTesting.count())
print("Rank:",BEST_RANK," Iteration:",BEST_ITERATION," Regulation:",BEST_REGULATION)
#print("RDD Training Values:",rddTraining.collect())
#[END split_sets]
print("Start predicting")
#[START predict]
# Build our model with the best found values
# Rating, Rank, Iteration, Regulation
model = ALS.train(rddTraining, BEST_RANK, BEST_ITERATION, BEST_REGULATION)
# print("-----------------")
# print("User Groups Are Created")
# print("-----------------")
user_features = model.userFeatures().map(lambda x: x[1])
related_users = model.userFeatures().map(lambda x: x[0])
number_of_clusters = 10
model_kmm = KMeans.train(user_features, number_of_clusters, initializationMode = "random", runs = 3)
user_features_with_cluster_id = model_kmm.predict(user_features)
user_features_with_related_users = related_users.zip(user_features_with_cluster_id)
clusteredUsers = user_features_with_related_users.map(lambda x: (x[0],x[1]))
orderedUsers = clusteredUsers.takeOrdered(200,key = lambda x: x[1])
print("Ordered Users:")
print("--------------")
for x in orderedUsers:
print(x)
#[START save user groups]
userGroupSchema = StructType([StructField("primaryUser", IntegerType(), True), StructField("groupId", IntegerType(), True)])
dfUserGroups = sqlContext.createDataFrame(orderedUsers,userGroupSchema)
try:
dfUserGroups.write.jdbc(url=jdbcUrl, table=TABLE_USER_CLUSTERS, mode='append')
except:
print("Data is already written to DB")
print("Written to DB and Finished Job")
Спасибо