Spark Scala по среднему ряду, обрабатывая ноль - PullRequest
0 голосов
/ 07 ноября 2018

У меня есть фрейм данных с большим объемом данных и числом столбцов "n".

df_avg_calc: org.apache.spark.sql.DataFrame = [col1: double, col2: double ... 4 more fields]
+------------------+-----------------+------------------+-----------------+-----+-----+
|              col1|             col2|              col3|             col4| col5| col6|
+------------------+-----------------+------------------+-----------------+-----+-----+
|              null|             null|              null|             null| null| null|
|              14.0|              5.0|              73.0|             null| null| null|
|              null|             null|             28.25|             null| null| null|
|              null|             null|              null|             null| null| null|
|33.723333333333336|59.78999999999999|39.474999999999994|82.09666666666666|101.0|53.43|
|             26.25|             null|              null|              2.0| null| null|
|              null|             null|              null|             null| null| null|
|             54.46|           89.475|              null|             null| null| null|
|              null|            12.39|              null|             null| null| null|
|              null|             58.0|             19.45|              1.0| 1.33|158.0|
+------------------+-----------------+------------------+-----------------+-----+-----+

Мне нужно выполнить среднее по ряду, чтобы не усреднять ячейку с «нулем».

Это необходимо реализовать в Spark / Scala. Я пытался объяснить так же, как на прилагаемом изображении

rowise average

Что я пробовал до сих пор:

По ссылке - Рассчитать среднее значение строки, игнорируя NA в Spark Scala

val df = df_raw.schema.fieldNames.filter(f => f.contains("colname")) 
val rowMeans = df_raw.select(df.map(f => col(f)).reduce(+) / lit(df.length) as "row_mean") 

Где df_raw содержит столбцы, которые должны быть агрегированы (конечно, по очереди). Есть более 80 столбцов. Произвольно у них есть данные и значение NULL, в знаменателе необходимо игнорировать значение Null при расчете среднего. Это работает нормально, когда все столбцы содержат данные, даже один нуль в столбце возвращает ноль

Редактировать :

Я пытался настроить этот ответ от Терри Дактил

def average(l: Seq[Double]): Option[Double] = {
  val nonNull = l.flatMap(i => Option(i))
  if(nonNull.isEmpty) None else Some(nonNull.reduce(_ + _).toDouble / nonNull.size.toDouble)
}

val avgUdf = udf(average(_: Seq[Double]))

val rowAvgDF = df_avg_calc.select(avgUdf(array($"col1",$"col2",$"col3",$"col4",$"col5",$"col6")).as("row_avg"))
rowAvgDF.show(10,false)

rowAvgDF: org.apache.spark.sql.DataFrame = [row_avg: double]
+------------------+
|row_avg           |
+------------------+
|0.0               |
|15.333333333333334|
|4.708333333333333 |
|0.0               |
|61.58583333333333 |
|4.708333333333333 |
|0.0               |
|23.989166666666666|
|2.065             |
|39.63             |
+------------------+

Ответы [ 2 ]

0 голосов
/ 07 ноября 2018

Искра> = 2,4

Можно использовать aggregate:

val row_mean = expr("""aggregate(
  CAST(array(_1, _2, _3) AS array<double>), 
  -- Initial value
  -- Note that aggregate is picky about types
  CAST((0.0 as sum, 0.0 as n) AS struct<sum: double, n: double>), 
  -- Merge function
  (acc, x) -> (
    acc.sum + coalesce(x, 0.0), 
    acc.n + CASE WHEN x IS NULL THEN 0.0 ELSE 1.0 END), 
  -- Finalize function
  acc -> acc.sum / acc.n)""")

Использование:

df.withColumn("row_mean", row_mean).show

Результат:

+----+----+----+--------+
|  _1|  _2|  _3|row_mean|
+----+----+----+--------+
|null|null|null|    null|
| 2.0|null|null|     2.0|
|50.0|34.0|null|    42.0|
| 1.0| 2.0| 3.0|     2.0|
+----+----+----+--------+

Независимая от версии

Вычислить сумму и количество столбцов NOT NULL и разделить одно на другое:

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

def row_mean(cols: Column*) = {
  // Sum of values ignoring nulls
  val sum = cols
    .map(c => coalesce(c, lit(0)))
    .foldLeft(lit(0))(_ + _)
  // Count of not null values
  val cnt = cols
    .map(c => when(c.isNull, 0).otherwise(1))
    .foldLeft(lit(0))(_ + _)
  sum / cnt
}

Пример данных:

val df = Seq(
  (None, None, None), 
  (Some(2.0), None, None),
  (Some(50.0), Some(34.0), None),
  (Some(1.0), Some(2.0), Some(3.0))
).toDF

Результат:

df.withColumn("row_mean", row_mean($"_1", $"_2", $"_3")).show
+----+----+----+--------+
|  _1|  _2|  _3|row_mean|
+----+----+----+--------+
|null|null|null|    null|
| 2.0|null|null|     2.0|
|50.0|34.0|null|    42.0|
| 1.0| 2.0| 3.0|     2.0|
+----+----+----+--------+
0 голосов
/ 07 ноября 2018
def average(l: Seq[Integer]): Option[Double] = {
  val nonNull = l.flatMap(i => Option(i))
  if(nonNull.isEmpty) None else Some(nonNull.reduce(_ + _).toDouble / nonNull.size.toDouble)
}

val avgUdf = udf(average(_: Seq[Integer]))

val df = List((Some(1),Some(2)), (Some(1), None), (None, None)).toDF("a", "b")

val avgDf = df.select(avgUdf(array(df.schema.map(c => col(c.name)): _*)).as("average"))

avgDf.collect

res0: Array[org.apache.spark.sql.Row] = Array([1.5], [1.0], [null])

Проверка предоставленных вами данных дает правильный результат:

val df = List(
  (Some(10),Some(5), Some(5), None, None),
  (None, Some(5), Some(5), None, Some(5)),
  (Some(2), Some(8), Some(5), Some(1), Some(2)), 
  (None, None, None, None, None)
).toDF("col1", "col2", "col3", "col4", "col5")

Array[org.apache.spark.sql.Row] = Array([6.666666666666667], [5.0], [3.6], [null])

Обратите внимание, что если у вас есть столбцы, которые вы не хотите включать, убедитесь, что они фильтруются при заполнении массива, передаваемого в UDF.

Наконец:

val df = List(
  (Some(14), Some(5), Some(73), None.asInstanceOf[Option[Integer]], None.asInstanceOf[Option[Integer]], None.asInstanceOf[Option[Integer]])
).toDF("col1", "col2", "col3", "col4", "col5", "col6")

Array[org.apache.spark.sql.Row] = Array([30.666666666666668])

Что снова является правильным результатом.

Если вы хотите использовать Doubles ...

def average(l: Seq[java.lang.Double]): Option[java.lang.Double] = {
  val nonNull = l.flatMap(i => Option(i))
  if(nonNull.isEmpty) None else Some(nonNull.reduce(_ + _) / nonNull.size.toDouble)
}

val avgUdf = udf(average(_: Seq[java.lang.Double]))

val df = List(
  (Some(14.0), Some(5.0), Some(73.0), None.asInstanceOf[Option[java.lang.Double]], None.asInstanceOf[Option[java.lang.Double]], None.asInstanceOf[Option[java.lang.Double]])
).toDF("col1", "col2", "col3", "col4", "col5", "col6")

val avgDf = df.select(avgUdf(array(df.schema.map(c => col(c.name)): _*)).as("average"))

avgDf.collect

Array[org.apache.spark.sql.Row] = Array([30.666666666666668]) 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...