Spark Python: Как рассчитать сходство Жакара между каждой строкой в ​​СДР - PullRequest
0 голосов
/ 22 октября 2018

У меня есть таблица из 50 тысяч различных строк и 2 столбцов.Можно представить, что каждая строка является фильмом, а столбцы - атрибутами этого фильма - «ID»: идентификатор этого фильма, «Теги»: некоторые теги содержимого фильма, в виде списка строк длякаждый фильм .

Данные выглядят примерно так:

movie_1, ['romantic', 'comedy', 'English'];movie_2, ['action', 'kongfu', 'Chinese']

Моя цель - сначала вычислить жаккардовое сходство между каждым фильмом на основе их соответствующих тегов, и как только это будет сделано, я смогу узнатьдля каждого фильма (например, я выбираю movie_1), какие другие top 5 наиболее похожи фильмы, как с этим (в данном случае movie_1).И я хочу получить 5 лучших результатов не только для самого movie_1, но и для получения 5 лучших для всех фильмов .

Я пытался использовать Python для решения проблемы, однако прогонВремя здесь - большая проблема.Даже когда я использовал многопроцессорность, работающую на 6 ядрах, общее время работы продолжалось более 20 часов.

Код Python ниже:

import pandas as pd
from collections import Counter
import numpy as np
from multiprocessing import Pool
import time

col_names=['movie_id','tag_name']
df=pd.read_csv("movies.csv",names=col_names)
movie_ids=df['movie_id'].tolist()
tag_list=df['tag_name'].tolist()

def jaccard_similarity(string1, string2):
    intersection = set(string1).intersection(set(string2))
    union = set(string1).union(set(string2))
    return len(intersection)/float(len(union))

def jc_results(movie_id):
    result=Counter()
    this_index=movie_ids.index(movie_id)
    for another_id in movie_ids:
        that_index=movie_ids.index(another_id)
        if another_id==movie_id:
            continue
        else:
            tag_1=tag_list[this_index]
            tag_2=tag_list[that_index]
            jaccard = jaccard_similarity(tag_1,tag_2)
            result[(movie_id,another_id)]=jaccard
    return result.most_common(10)


from multiprocessing import Pool
pool=Pool(6)
results={}
for movie_id in movie_ids:
    results[movie_id]=pool.apply_async(jc_results,args=(movie_id,))
pool.close()
pool.join()
for movie_id, res in results.items():
    results[movie_id] = res.get()

Затем я захотел переключиться на Pyspark, однако я все еще очень плохо знаком с python и застрял после того, как написал несколько строк, на самом деле я только прогрессируюmade читал данные в RDD с помощью sc.textFile ... Прочитал существующие посты, но все они используют Scala ... Было бы здорово, если кто-то может помочь или дать какие-либо рекомендации в Pyspark.Большое спасибо!

1 Ответ

0 голосов
/ 12 июня 2019

Вы можете попробовать решение, подобное этому ответу stackoverflow , хотя, поскольку ваши данные уже разбиты на токены (список строк), вам не нужно будет делать этот шаг или шаг ngram.

Я также упомяну, что приближение приближенияJacin в pyspark вычисляет расстояние Жакара, а не сходство Жакара, но вы можете просто вычесть из 1, чтобы преобразовать обратно в Сходство, если вам это нужно, в частности.

Ваш код будет выглядеть примерно так:

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, MinHashLSH
import pyspark.sql.functions as f

db = spark.createDataFrame([
        ('movie_1', ['romantic','comedy','English']),
        ('movie_2', ['action','kongfu','Chinese']),
        ('movie_3', ['romantic', 'action'])
    ], ['movie_id', 'genres'])


model = Pipeline(stages=[
        HashingTF(inputCol="genres", outputCol="vectors"),
        MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=10)
    ]).fit(db)

db_hashed = model.transform(db)

db_matches = model.stages[-1].approxSimilarityJoin(db_hashed, db_hashed, 0.9)

#show all matches (including duplicates)
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).show()

#show non-duplicate matches
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).filter('movie_id_A < movie_id_B').show()

с соответствующим выводом:

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_3|   movie_3|    0.0|
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
|   movie_1|   movie_1|    0.0|
|   movie_2|   movie_2|    0.0|
|   movie_3|   movie_2|   0.75|
|   movie_3|   movie_1|   0.75|
+----------+----------+-------+

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
+----------+----------+-------+
...