def average(l: Seq[Integer]): Option[Double] = {
val nonNull = l.flatMap(i => Option(i))
if(nonNull.isEmpty) None else Some(nonNull.reduce(_ + _).toDouble / nonNull.size.toDouble)
}
val avgUdf = udf(average(_: Seq[Integer]))
val df = List((Some(1),Some(2)), (Some(1), None), (None, None)).toDF("a", "b")
val avgDf = df.select(avgUdf(array(df.schema.map(c => col(c.name)): _*)).as("average"))
avgDf.collect
res0: Array[org.apache.spark.sql.Row] = Array([1.5], [1.0], [null])
Проверка предоставленных вами данных дает правильный результат:
val df = List(
(Some(10),Some(5), Some(5), None, None),
(None, Some(5), Some(5), None, Some(5)),
(Some(2), Some(8), Some(5), Some(1), Some(2)),
(None, None, None, None, None)
).toDF("col1", "col2", "col3", "col4", "col5")
Array[org.apache.spark.sql.Row] = Array([6.666666666666667], [5.0], [3.6], [null])
Обратите внимание, что если у вас есть столбцы, которые вы не хотите включать, убедитесь, что они фильтруются при заполнении массива, передаваемого в UDF.
Наконец:
val df = List(
(Some(14), Some(5), Some(73), None.asInstanceOf[Option[Integer]], None.asInstanceOf[Option[Integer]], None.asInstanceOf[Option[Integer]])
).toDF("col1", "col2", "col3", "col4", "col5", "col6")
Array[org.apache.spark.sql.Row] = Array([30.666666666666668])
Что снова является правильным результатом.
Если вы хотите использовать Doubles ...
def average(l: Seq[java.lang.Double]): Option[java.lang.Double] = {
val nonNull = l.flatMap(i => Option(i))
if(nonNull.isEmpty) None else Some(nonNull.reduce(_ + _) / nonNull.size.toDouble)
}
val avgUdf = udf(average(_: Seq[java.lang.Double]))
val df = List(
(Some(14.0), Some(5.0), Some(73.0), None.asInstanceOf[Option[java.lang.Double]], None.asInstanceOf[Option[java.lang.Double]], None.asInstanceOf[Option[java.lang.Double]])
).toDF("col1", "col2", "col3", "col4", "col5", "col6")
val avgDf = df.select(avgUdf(array(df.schema.map(c => col(c.name)): _*)).as("average"))
avgDf.collect
Array[org.apache.spark.sql.Row] = Array([30.666666666666668])