Рекомендации для подмножества пользователей, использующих Pyspark mllib ALS / MatrixFactorizationModel - PullRequest
2 голосов
/ 21 ноября 2019

Я построил модель, используя следующий код:

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

model1 = ALS.train(ratings=ratingsR, rank=model_params['rank'], \
                   iterations=model_params['iterations'], lambda_=model_params['lambda'], \
                   blocks=model_params['blocks'], nonnegative=model_params['nonnegative'], \
                   seed=model_params['seed'])

Теперь я хочу прогнозировать кампании для всех пользователей (или подмножество пользователей), используя распределенную среду, предоставляемую spark.

Я попробовал recommendProductsForUsers, что требует от меня 32M Users X 4000 продуктов.

preds = model1.recommendProductsForUsers(num=4000)

Мне действительно не нужны рекомендации для всех 32M пользователей. У меня тоже все в порядке с 100k-200k.

Итак, чтобы изменить свой процесс, я попробовал способ обработки в формате spark udf для каждого пользователя по одному, но с использованием механизма распределения кластера spark:

import pyspark.sql.functions as F
def udf_preds(sameModel):
    return F.udf(lambda x: get_predictions(x, sameModel))

def get_predictions(x, sameModel):
    preds = sameModel.recommendProducts(user=x, num=4000) # per user it takes around 4s
    return preds

test.withColumn('predictions', udf_preds(model1)(F.col('user_id')))

Тест содержит около 200 000 пользователей. Вышеприведенная ошибка завершается следующей ошибкой:

PicklingError: Не удалось сериализовать объект: Исключение: создается впечатление, что вы пытаетесь сослаться на SparkContext из широковещательной переменной, действия или преобразования. SparkContext может использоваться только в драйвере, а не в коде, который он запускает на рабочих. Для получения дополнительной информации см. SPARK-5063.

Как лучше выполнить рекомендации для подмножества пользователей?

(РЕДАКТИРОВАТЬ)

Из ответа @ piscall. Я пытался сделать то же самое, используя RDD.

preds_rdd = test.rdd.map(lambda x: (x.user_id, sameModel.recommendProducts(x.user_id, 4000)))
preds_rdd.take(2)
 File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 330, in __getnewargs__
    "It appears that you are attempting to reference SparkContext from a broadcast "
 Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

 PicklingErrorTraceback (most recent call last)
<ipython-input-17-e114800a26e7> in <module>()
----> 1 preds_rdd.take(2)

 /usr/hdp/current/spark2-client/python/pyspark/rdd.py in take(self, num)
   1356 
   1357             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1358             res = self.context.runJob(self, takeUpToNumLeft, p)
   1359 
   1360             items += res

 /usr/hdp/current/spark2-client/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
   1038         # SparkContext#runJob.
   1039         mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1040         sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
   1041         return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
   1042 

 /usr/hdp/current/spark2-client/python/pyspark/rdd.py in _jrdd(self)
   2470 
   2471         wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2472                                       self._jrdd_deserializer, profiler)
   2473         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
   2474                                              self.preservesPartitioning)

 /usr/hdp/current/spark2-client/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
   2403     assert serializer, "serializer should not be empty"
   2404     command = (func, profiler, deserializer, serializer)
-> 2405     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
   2406     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
   2407                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)

 /usr/hdp/current/spark2-client/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
   2389     # the serialized command will be compressed by broadcast
   2390     ser = CloudPickleSerializer()
-> 2391     pickled_command = ser.dumps(command)
   2392     if len(pickled_command) > (1 << 20):  # 1M
   2393         # The broadcast will have same life cycle as created PythonRDD

 /usr/hdp/current/spark2-client/python/pyspark/serializers.py in dumps(self, obj)
    573 
    574     def dumps(self, obj):
--> 575         return cloudpickle.dumps(obj, 2)
    576 
    577 

 /usr/hdp/current/spark2-client/python/pyspark/cloudpickle.py in dumps(obj, protocol)
    916 
    917     cp = CloudPickler(file,protocol)
--> 918     cp.dump(obj)
    919 
    920     return file.getvalue()

/u
I have built a model using the below code:

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
sr/hdp/current/spark2-client/python/pyspark/cloudpickle.py in dump(self, obj)
    247                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    248             print_exec(sys.stderr)
--> 249             raise pickle.PicklingError(msg)
    250 
    251 

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Ответы [ 3 ]

3 голосов
/ 27 ноября 2019

Вы можете попробовать следующее:

"""
Convert the test df to rdd, and then map a function that returns 
(user_id, array(Rating(user, product, rating)))
"""
preds_rdd = test.rdd.map(lambda x: (x.user_id, sameModel.recommendProducts(x.user_id, 4000)))

# Convert (user_id, array(Rating(user, product, rating))) to 
# (user_id, array(product_names))
preds_rdd2 = preds_rdd.map(lambda row: row[0], [x.product for x in row[1])
# Convert above RDD to DF with user_id and predicted_products columns
preds_df = preds_rdd2.toDF(["user_id", "predicted_products"])

Я не проверял, но это прямо из документации по здесь :

Затем вы можете присоединиться к нему. вернуться к исходному фрейму данных или сохранить столбцы.

Затем вы можете разбить массив продуктов на разные строки с помощью explode(), если вам нужно.

2 голосов
/ 02 декабря 2019

я бы использовал ForegnateAll метод. Предположим, что df_products - это фрейм данных, содержащий все 4000 продуктов, а df_users - фрейм данных с выбранными пользователями 100-200K, затем выполните перекрестный переход, чтобы найти все комбинации двух наборов данных для формирования тестовых данных. , затем используйте предикат, который выдаст объекты рейтинга выбранных пользователей по 4000 товарам:

from pyspark.sql.functions import broadcast
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

testdata = broadcast(df_products).crossJoin(df_users).select('user', 'product').rdd.map(tuple)

model.predictAll(testdata).collect()

Используйте пример из документации , в которой есть 4 товара (1,2,3,4) и 4 пользователя (1,2,3,4):

df_products.collect()                                                                                              
# [Row(product=1), Row(product=3), Row(product=2), Row(product=4)]

# a subset of all users:
df_users.collect()                                                                                                 
# [Row(user=1), Row(user=3)]

testdata.collect()                                                                                                 
# [(1, 1), (1, 3), (1, 2), (1, 4), (3, 1), (3, 3), (3, 2), (3, 4)]

model.predictAll(testdata).collect()
#[Rating(user=1, product=4, rating=0.9999459747142155),
# Rating(user=3, product=4, rating=4.99555263974573),
# Rating(user=1, product=1, rating=4.996821463543848),
# Rating(user=3, product=1, rating=1.000199620693615),
# Rating(user=1, product=3, rating=4.996821463543848),
# Rating(user=3, product=3, rating=1.000199620693615),
# Rating(user=1, product=2, rating=0.9999459747142155),
# Rating(user=3, product=2, rating=4.99555263974573)]

Примечание: может потребоваться отключить пользователей, которых нет в существующей модели, до создания testdataи обрабатывать их отдельно.

1 голос
/ 25 ноября 2019

как насчет preds = sameModel.predict(x), я не знаю spark и даже scala, но именно так мы делаем в python, и я думаю, что это то же самое для spark. Если вы хотите предсказать с помощью подвыборок x, то вы можете сделать что-то вроде: preds = sameModel.predict(x[0:200,::]) или стих стихов для столбцов.

...