Я хотел бы понять, как лучше всего выполнить агрегацию в Spark в этом сценарии:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
case class Person(name:String, acc:Int, logDate:String)
val dateFormat = "dd/MM/yyyy"
val filterType = // Could has "MIN" or "MAX" depending on a run parameter
val filterDate = new Timestamp(System.currentTimeMillis)
val df = sc.parallelize(List(Person("Giorgio",20,"31/12/9999"),
Person("Giorgio",30,"12/10/2009")
Person("Diego", 10,"12/10/2010"),
Person("Diego", 20,"12/10/2010"),
Person("Diego", 30,"22/11/2011"),
Person("Giorgio",10,"31/12/9999"),
Person("Giorgio",30,"31/12/9999"))).toDF()
val df2 = df.withColumn("logDate",unix_timestamp($"logDate",dateFormat).cast(TimestampType))
val df3 = df.groupBy("name").agg(/*conditional aggregation*/)
df3.show /*Expected output show below */
В основном я хочу сгруппировать все записи по столбцу name
, а затем на основе параметра filterType
я хочу отфильтровать все действительные записи для персоны, затем после фильтрации я хочу суммировать все значения acc
, полученные финал
DataFrame
с имя и totalAcc столбцы.
Например:
- filterType = MIN , я хочу взять все записи, имеющие min (logDate), их может быть много, поэтому в основном в этом случае я полностью игнорирую параметр filterDate:
Diego,10,12/10/2010
Diego,20,12/10/2010
Giorgio,30,12/10/2009
Окончательный результат, ожидаемый от агрегации: (Диего, 30), (Джорджо, 30)
- filterType = MAX , я хочу взять все записи с logDate> filterDate, у меня для ключа нет записей, относящихся к этому условию, мне нужно взять записи с min (logDate) как сделано в сценарии MIN, так:
Diego, 10, 12/10/2010
Diego, 20, 12/10/2010
Giorgio, 20, 31/12/9999
Giorgio, 10, 31/12/9999
Giorgio, 30, 31/12/9999
Окончательный результат, ожидаемый от агрегации: (Диего, 30), (Джорджо, 60)
В этом случае для Диего у меня не было записей с logDate> logFilter, поэтому я отступил, чтобы применить сценарий MIN, взяв только для Диего все записи с min logDate.