Ваша проблема может стать довольно сложной, если вы хотите правильно ее решить, но здесь у вас есть пример кода в pyspark, который, как мы надеемся, поможет вам начать работу.
Сначала небольшой набор данных,
tinydata = sqlContext.createDataFrame(
[
(3527524, 'aamir', 'al malik', 'aamir.almalik@gmail.com'),
(4287983, 'aamir', 'al malik', 'aamir.almalik@company.com'),
(200490, 'aamir', 'al malik', 'aamir.almalik@gmail.come'),
(1906639, 'tahir', 'al malik', 'tahir.almalik@gmail.com')
],
['ID', 'first_NAME', 'last_NAME', 'EMAIL']
)
Затем вы конвертируете его в матрицу разностей через cross-join
. Обратите внимание, что если у вас есть 5 миллионов, это станет огромным. Вы должны избегать сравнений в максимально возможной степени, таких как следование некоторым комментариям к вашему вопросу и другим идеям, которые вы можете придумать. Обратите внимание, что последний фильтр состоит в том, чтобы избегать сравнения двух строк дважды.
matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
.crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
.filter(F.col('ID1') > F.col('ID2'))
После этого вы можете вычислять расстояния.
def lev_dist(left, right):
return Levenshtein.distance(left, right)
lev_dist_udf = udf(lev_dist, IntegerType())
res = matrix.withColumn('d', lev_dist_udf(F.col('EMAIL1'), F.col('EMAIL2')))
На крошечном примере вы получите
res.show()
+-------+--------------------+-------+--------------------+---+
| ID1| EMAIL1| ID2| EMAIL2| d|
+-------+--------------------+-------+--------------------+---+
|3527524|aamir.almalik@gma...| 200490|aamir.almalik@gma...| 1|
|3527524|aamir.almalik@gma...|1906639|tahir.almalik@gma...| 2|
|4287983|aamir.almalik@com...|3527524|aamir.almalik@gma...| 5|
|4287983|aamir.almalik@com...| 200490|aamir.almalik@gma...| 6|
|4287983|aamir.almalik@com...|1906639|tahir.almalik@gma...| 7|
|1906639|tahir.almalik@gma...| 200490|aamir.almalik@gma...| 3|
+-------+--------------------+-------+--------------------+---+
Спасибо, что указали на @ cronoik
Нет необходимости в udf, должно быть что-то вроде этого:
from pyspark.sql.functions import levenshtein
matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
.crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
.filter(F.col('ID1') > F.col('ID2'))
res = matrix.withColumn('d', levenshtein(F.col('EMAIL1'), F.col('EMAIL2')))