pySpark DataFrame: как распараллелить сравнение столбцов двух фреймов данных? - PullRequest
0 голосов
/ 11 июля 2020

У меня есть два DataFrame, и я хочу применить distance.euclidean(df1.select(col),df2.select(col)) для каждого столбца из двух DataFrame.

Пример:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame([(1,10),(2,13)],["A","B"])
df2 = spark.createDataFrame([(3,40),(2,20)],["A","B"])

# Apply distance function for each columns of `df1` and `df2`
from scipy.spatial import distance
for col in df1.columns:
    d = distance.euclidean(df1.select(col).rdd.flatMap(lambda x:x).collect(), df2.select(col).rdd.flatMap(lambda x:x).collect())
    print(col,d)

Количество столбцов большое, около 5,000. Есть ли какой-либо метод вычисления distance столбцов параллельно вместо того, чтобы вычислять один за другим с помощью функции for.

1 Ответ

1 голос
/ 12 июля 2020

Насколько я знаю, встроенной функции евклидова расстояния нет, но вы можете легко построить ее с помощью sum , pow , sqrt в качестве уравнение довольно простое:

euclidean distance equation

df1 = spark.createDataFrame([(1, 10, 1),(2, 13, 2), (3, 5, 3)], ["A", "B", "id"])
df2 = spark.createDataFrame([(3, 40, 1),(2, 20, 2), (3, 10, 3)],["A", "B", "id"])

df1 = df1.alias("df1")
df2 = df2.alias("df2")

df = df1.join(df2, 'id', 'inner')
df.show()

Output:

+---+---+---+---+---+
| id|  A|  B|  A|  B|
+---+---+---+---+---+
|  1|  1| 10|  3| 40|
|  3|  3|  5|  3| 10|
|  2|  2| 13|  2| 20|
+---+---+---+---+---+
expression = ['sqrt(sum(pow((df1.{col} - df2.{col}),2))) as {col}'.format(col=c) for c in df1.columns if c !='id']
print(expression)
df.selectExpr(expression).show()

Вывод:

['sqrt(sum(pow((df1.A - df2.A),2))) as A', 'sqrt(sum(pow((df1.B - df2.B),2))) as B']
+---+-----------------+
|  A|                B|
+---+-----------------+
|2.0|31.20897306865447|
+---+-----------------+

PS: collect следует использовать только когда фрейм данных мал, так как все данные загружаются в память вашего искрового драйвера.

...