Динамически применять агрегатную функцию в кадре данных SPARK - PullRequest
0 голосов
/ 04 июля 2019

Как мне параметризировать приведенную ниже функцию зажигания. Значения groupBy и Pivot являются постоянными. Мне нужно параметризовать

 var final_df_transpose=df_transpose.groupBy("_id").pivot("Type").agg(first("Value").alias("Value"),first("OType").alias("OType"),first("DateTime").alias("DateTime"))

Невозможно передать псевдоним как динамически в приведенном выше сценарии.

    agg_Map scala.collection.mutable.Map[String,String] = Map( OType -> first, Type -> first, Value -> first, DateTime -> first)

var agg_Map = collection.mutable.Map[String, String]()
for (aggDataCol <- fin_agg_col) {
    agg_Map1 += (aggDataCol -> "first")
  }
df_transpose.groupBy("_id").pivot("Type").agg(agg_Map.toMap).show 

1 Ответ

0 голосов
/ 05 июля 2019

Я могу придумать два пути, но я тоже не доволен.

Во-первых, определите ваши агрегаты как список Column s. Раздражает то, что для удовлетворения сигнатуры метода необходимо добавить фиктивный столбец, а затем удалить его после агрегирования:

scala> val in = spark.read.option("header", true).csv("""_id,Type,Value,OType,DateTime
     | 0,a,b,c,d
     | 1,aaa,bbb,ccc,ddd""".split("\n").toSeq.toDS)
in: org.apache.spark.sql.DataFrame = [_id: string, Type: string ... 3 more fields]

scala> in.show
+---+----+-----+-----+--------+
|_id|Type|Value|OType|DateTime|
+---+----+-----+-----+--------+
|  0|   a|    b|    c|       d|
|  1| aaa|  bbb|  ccc|     ddd|
+---+----+-----+-----+--------+

scala> val aggColumns = Seq("Value", "OType", "DateTime").map{c => first(c).alias(c)}
aggColumns: Seq[org.apache.spark.sql.Column] = List(first(Value, false) AS `Value`, first(OType, false) AS `OType`, first(DateTime, false) AS `DateTime`)
scala> val df_intermediate = in.groupBy("_id").pivot("Type").agg(lit("dummy"), aggColumns : _*)
df_intermediate: org.apache.spark.sql.DataFrame = [_id: string, a_dummy: string ... 7 more fields]

scala> df_intermediate.show
+---+-------+-------+-------+----------+---------+---------+---------+------------+
|_id|a_dummy|a_Value|a_OType|a_DateTime|aaa_dummy|aaa_Value|aaa_OType|aaa_DateTime|
+---+-------+-------+-------+----------+---------+---------+---------+------------+
|  0|  dummy|      b|      c|         d|    dummy|     null|     null|        null|
|  1|  dummy|   null|   null|      null|    dummy|      bbb|      ccc|         ddd|
+---+-------+-------+-------+----------+---------+---------+---------+------------+

scala> val df_final = df_intermediate.drop(df_intermediate.schema.collect{case c if c.name.endsWith("_dummy") => c.name} : _*)
df_final: org.apache.spark.sql.DataFrame = [_id: string, a_Value: string ... 5 more fields]

scala> df_final.show
+---+-------+-------+----------+---------+---------+------------+
|_id|a_Value|a_OType|a_DateTime|aaa_Value|aaa_OType|aaa_DateTime|
+---+-------+-------+----------+---------+---------+------------+
|  0|      b|      c|         d|     null|     null|        null|
|  1|   null|   null|      null|      bbb|      ccc|         ddd|
+---+-------+-------+----------+---------+---------+------------+

Во втором продолжите использовать Map выражений agg, затем используйте регулярное выражение, чтобы найти переименованные столбцы и изменить их обратно:

scala> val aggExprs = Map(("OType" -> "first"), ("Value" -> "first"), "DateTime" -> "first")
aggExprs: scala.collection.immutable.Map[String,String] = Map(OType -> first, Value -> first, DateTime -> first)

scala> val df_intermediate = in.groupBy("_id").pivot("Type").agg(aggExprs)
df_intermediate: org.apache.spark.sql.DataFrame = [_id: string, a_first(OType, false): string ... 5 more fields]

scala> df_intermediate.show
+---+---------------------+---------------------+------------------------+-----------------------+-----------------------+--------------------------+
|_id|a_first(OType, false)|a_first(Value, false)|a_first(DateTime, false)|aaa_first(OType, false)|aaa_first(Value, false)|aaa_first(DateTime, false)|
+---+---------------------+---------------------+------------------------+-----------------------+-----------------------+--------------------------+
|  0|                    c|                    b|                       d|                   null|                   null|                      null|
|  1|                 null|                 null|                    null|                    ccc|                    bbb|                       ddd|
+---+---------------------+---------------------+------------------------+-----------------------+-----------------------+--------------------------+


scala> val regex = "^(.*)_first\\((.*), false\\)$".r
regex: scala.util.matching.Regex = ^(.*)_first\((.*), false\)$

scala> val replacements = df_intermediate.schema.collect{ case c if regex.findFirstMatchIn(c.name).isDefined => 
     | val regex(pivotVal, colName) = c.name
     | c.name -> s"${pivotVal}_$colName"
     | }.toMap
replacements: scala.collection.immutable.Map[String,String] = Map(a_first(DateTime, false) -> a_DateTime, aaa_first(DateTime, false) -> aaa_DateTime, aaa_first(OType, false) -> aaa_OType, a_first(Value, false) -> a_Value, a_first(OType, false) -> a_OType, aaa_first(Value, false) -> aaa_Value)

scala> val df_final = replacements.foldLeft(df_intermediate){(df, c) => df.withColumnRenamed(c._1, c._2)}
df_final: org.apache.spark.sql.DataFrame = [_id: string, a_OType: string ... 5 more fields]

scala> df_final.show
+---+-------+-------+----------+---------+---------+------------+
|_id|a_OType|a_Value|a_DateTime|aaa_OType|aaa_Value|aaa_DateTime|
+---+-------+-------+----------+---------+---------+------------+
|  0|      c|      b|         d|     null|     null|        null|
|  1|   null|   null|      null|      ccc|      bbb|         ddd|
+---+-------+-------+----------+---------+---------+------------+

Сделайте свой выбор, но оба включают несколько ненужных шагов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...