Условная агрегация Spark DataFrame - PullRequest
0 голосов
/ 25 августа 2018

Я хотел бы понять, как лучше всего выполнить агрегацию в Spark в этом сценарии:

import sqlContext.implicits._  
import org.apache.spark.sql.functions._
case class Person(name:String, acc:Int, logDate:String)
val dateFormat = "dd/MM/yyyy"
val filterType = // Could has "MIN" or "MAX" depending on a run parameter
val filterDate = new Timestamp(System.currentTimeMillis)

val df = sc.parallelize(List(Person("Giorgio",20,"31/12/9999"),
                             Person("Giorgio",30,"12/10/2009")
                             Person("Diego",  10,"12/10/2010"),
                             Person("Diego",  20,"12/10/2010"),
                             Person("Diego",  30,"22/11/2011"), 
                             Person("Giorgio",10,"31/12/9999"),
                             Person("Giorgio",30,"31/12/9999"))).toDF()

val df2 = df.withColumn("logDate",unix_timestamp($"logDate",dateFormat).cast(TimestampType))

val df3 = df.groupBy("name").agg(/*conditional aggregation*/)
df3.show /*Expected output show  below */

В основном я хочу сгруппировать все записи по столбцу name, а затем на основе параметра filterType я хочу отфильтровать все действительные записи для персоны, затем после фильтрации я хочу суммировать все значения acc, полученные финал DataFrame с имя и totalAcc столбцы.

Например:

  • filterType = MIN , я хочу взять все записи, имеющие min (logDate), их может быть много, поэтому в основном в этом случае я полностью игнорирую параметр filterDate:

Diego,10,12/10/2010 Diego,20,12/10/2010 Giorgio,30,12/10/2009

Окончательный результат, ожидаемый от агрегации: (Диего, 30), (Джорджо, 30)

  • filterType = MAX , я хочу взять все записи с logDate> filterDate, у меня для ключа нет записей, относящихся к этому условию, мне нужно взять записи с min (logDate) как сделано в сценарии MIN, так:

Diego, 10, 12/10/2010 Diego, 20, 12/10/2010 Giorgio, 20, 31/12/9999 Giorgio, 10, 31/12/9999 Giorgio, 30, 31/12/9999

Окончательный результат, ожидаемый от агрегации: (Диего, 30), (Джорджо, 60) В этом случае для Диего у меня не было записей с logDate> logFilter, поэтому я отступил, чтобы применить сценарий MIN, взяв только для Диего все записи с min logDate.

Ответы [ 2 ]

0 голосов
/ 25 августа 2018

Вы можете написать свою условную агрегацию, используя when/otherwise как

df2.groupBy("name").agg(sum(when(lit(filterType) === "MIN" && $"logDate" < filterDate, $"acc").otherwise(when(lit(filterType) === "MAX" && $"logDate" > filterDate, $"acc"))).as("sum"))
    .filter($"sum".isNotNull)

, что даст вам желаемый результат в соответствии с filterType

Но

В конце концов вам потребуются оба агрегированных фрейма данных , поэтому я бы посоветовал вам избежать поля filterType и просто выполнить агрегирование путем создания дополнительного столбца для группировки с использованием функции when/otherwise.Таким образом, вы можете иметь оба агрегированных значения в одном кадре данных как

df2.withColumn("additionalGrouping", when($"logDate" < filterDate, "less").otherwise("more"))
    .groupBy("name", "additionalGrouping").agg(sum($"acc"))
    .drop("additionalGrouping")
    .show(false)

, которые будут выводиться как

+-------+--------+
|name   |sum(acc)|
+-------+--------+
|Diego  |10      |
|Giorgio|60      |
+-------+--------+

Обновлено

Поскольку вопрос обновляется с измененной логикойВот идея и решение измененного сценария

import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("name").orderBy($"logDate".asc)

val minDF = df2.withColumn("minLogDate", first("logDate").over(windowSpec)).filter($"minLogDate" === $"logDate")
  .groupBy("name")
  .agg(sum($"acc").as("sum"))

val finalDF =
  if(filterType == "MIN") {
    minDF
  }
  else if(filterType == "MAX"){
    val tempMaxDF = df2
      .groupBy("name")
      .agg(sum(when($"logDate" > filterDate,$"acc")).as("sum"))

    tempMaxDF.filter($"sum".isNull).drop("sum").join(minDF, Seq("name"), "left").union(tempMaxDF.filter($"sum".isNotNull))
  }
  else {
    df2
  }

, поэтому для filterType = MIN вы должны иметь

+-------+---+
|name   |sum|
+-------+---+
|Diego  |30 |
|Giorgio|30 |
+-------+---+

, а для filterType = MAX вы должны иметь

+-------+---+
|name   |sum|
+-------+---+
|Diego  |30 |
|Giorgio|60 |
+-------+---+

В случае, если filterType не MAX или MIN, возвращается исходный фрейм данных

Надеюсь, ответ будет полезным

0 голосов
/ 25 августа 2018

Вам не нужна условная агрегация. Просто фильтр:

df
  .where(if (filterType == "MAX") $"logDate" < filterDate else $"logDate" > filterDate)
  .groupBy("name").agg(sum($"acc")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...