Я не думаю, что вам нужен UDF для этого - я думаю, вы должны быть в состоянии взять разницу между двумя столбцами (df.withColumn('difference', col('true') - col('pred'))
), затем вычислить квадрат этого столбца (df.withColumn('squared_difference', pow(col('difference'), lit(2).astype(IntegerType()))
) и вычислить среднее значение столбца (df.withColumn('rmse', avg('squared_difference'))
). Собираем все вместе с примером:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.getOrCreate()
sql_context = SQLContext(spark.sparkContext)
df = sql_context.createDataFrame([(0.0, 1.0),
(1.0, 2.0),
(3.0, 5.0),
(1.0, 8.0)], schema=['true', 'predicted'])
df = df.withColumn('difference', F.col('true') - F.col('predicted'))
df = df.withColumn('squared_difference', F.pow(F.col('difference'), F.lit(2).astype(IntegerType())))
rmse = df.select(F.avg(F.col('squared_difference')).alias('rmse'))
print(df.show())
print(rmse.show())
Вывод:
+----+---------+----------+------------------+
|true|predicted|difference|squared_difference|
+----+---------+----------+------------------+
| 0.0| 1.0| -1.0| 1.0|
| 1.0| 2.0| -1.0| 1.0|
| 3.0| 5.0| -2.0| 4.0|
| 1.0| 8.0| -7.0| 49.0|
+----+---------+----------+------------------+
+-----+
| rmse|
+-----+
|13.75|
+-----+
Надеюсь, это поможет!
Редактировать
Извините, я забыл возьмите квадрат root результата - последняя строка становится:
rmse = df.select(F.sqrt(F.avg(F.col('squared_difference'))).alias('rmse'))
и вывод становится:
+------------------+
| rmse|
+------------------+
|3.7080992435478315|
+------------------+