Как перебрать сгруппированные строки для создания нескольких строк в потоковой структурированной искре? - PullRequest
0 голосов
/ 31 декабря 2018

У меня есть входные данные, такие как:

id     operation          value
1      null                1
1      discard             0
2      null                1
2      null                2
2      max                 0
3      null                1
3      null                1
3      list                0

Я хочу сгруппировать входные данные и создать строки в соответствии со столбцом «операция».

для группы 1, операция = «сброс»", тогда выход равен нулю,

для группы 2, операция =" макс. ", выход:

2      null                2

для группы 3, операция =" список ", выход:

3      null                1
3      null                1

Итак, в итоге получается что-то вроде:

  id     operation          value
   2      null                2
   3      null                1
   3      null                1

Есть ли решение для этого?

Я знаю, что есть похожий вопрос как-to-iterate-grouped-data-in-spark Но различия по сравнению с этим следующие:

    1. Я хочу создать более одной строки для каждой сгруппированных данных,Возможно и как?
    2. Я хочу, чтобы моя логика была легко расширена, чтобы в будущем было добавлено больше операций.Таким образом, пользовательские агрегатные функции (или UDAF) - единственно возможное решение?

Обновление 1:

Спасибо stack0114106, затем больше подробностей в соответствии с его ответом, например, для id = 1, operation = "max", я хочу перебрать весь элемент с id = 2 и найти максимальное значение, а не назначать жестко запрограммированныйзначение, поэтому я хочу перебирать строки в каждой группе.Ниже приведен обновленный пример:

Ввод:

scala> val df = Seq((0,null,1),(0,"discard",0),(1,null,1),(1,null,2),(1,"max",0),(2,null,1),(2,null,3),(2,"max",0),(3,null,1),(3,null,1),(3,"list",0)).toDF("id"
,"operation","value")
df: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 1 more field]

scala> df.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|0  |null     |1    |
|0  |discard  |0    |
|1  |null     |1    |
|1  |null     |2    |
|1  |max      |0    |
|2  |null     |1    |
|2  |null     |3    |
|2  |max      |0    |
|3  |null     |1    |
|3  |null     |1    |
|3  |list     |0    |
+---+---------+-----+

Ожидаемый вывод:

+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|1  |null     |2    |
|2  |null     |3    |
|3  |null     |1    |
|3  |null     |1    |
+---+---------+-----+

Ответы [ 2 ]

0 голосов
/ 01 января 2019

группирует все, собирая значения, затем записывает логику для каждой операции:

import org.apache.spark.sql.functions._
val grouped=df.groupBy($"id").agg(max($"operation").as("op"),collect_list($"value").as("vals"))
val maxs=grouped.filter($"op"==="max").withColumn("val",explode($"vals")).groupBy($"id").agg(max("val").as("value"))
val lists=grouped.filter($"op"==="list").withColumn("value",explode($"vals")).filter($"value"!==0).select($"id",$"value")
//we don't collect the "discard"
//and we can add additional subsets for new "operations"
val result=maxs.union(lists)
//if you need the null in "operation" column add it with withColumn
0 голосов
/ 31 декабря 2018

Вы можете использовать операцию flatMap на фрейме данных и генерировать необходимые строки на основе условий, которые вы упомянули.Проверьте это

scala> val df = Seq((1,null,1),(1,"discard",0),(2,null,1),(2,null,2),(2,"max",0),(3,null,1),(3,null,1),(3,"list",0)).toDF("id","operation","value")
df: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 1 more field]

scala> df.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|1  |null     |1    |
|1  |discard  |0    |
|2  |null     |1    |
|2  |null     |2    |
|2  |max      |0    |
|3  |null     |1    |
|3  |null     |1    |
|3  |list     |0    |
+---+---------+-----+


scala> df.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => (0,0) case "max" => (1,2) case "list" => (2,1) } ; (0
 until s._1).map( i => (r.getInt(0),null,s._2) ) }).show(false)
+---+----+---+
|_1 |_2  |_3 |
+---+----+---+
|2  |null|2  |
|3  |null|1  |
|3  |null|1  |
+---+----+---+

Spark назначает _1, _2 и т. Д., Чтобы вы могли сопоставить их с реальными именами, назначив их, как показано ниже

scala> val df2 = df.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => (0,0) case "max" => (1,2) case "list" => (2,1) } ; (0 until s._1).map( i => (r.getInt(0),null,s._2) ) }).toDF("id","operation","value")
df2: org.apache.spark.sql.DataFrame = [id: int, operation: null ... 1 more field]

scala> df2.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|2  |null     |2    |
|3  |null     |1    |
|3  |null     |1    |
+---+---------+-----+


scala>

EDIT1:

Поскольку для каждого идентификатора требуется максимальное значение (значение), вы можете использовать оконные функции и получить максимальное значение в новом столбце, а затем использовать ту же технику и получить результаты.Проверьте это

scala> val df =   Seq((0,null,1),(0,"discard",0),(1,null,1),(1,null,2),(1,"max",0),(2,null,1),(2,null,3),(2,"max",0),(3,null,1),(3,null,1),(3,"list",0)).toDF("id","operation","value")
df: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 1 more field]

scala> df.createOrReplaceTempView("michael")

scala> val df2 = spark.sql(""" select *, max(value) over(partition by id) mx from michael """)
df2: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 2 more fields]

scala> df2.show(false)
+---+---------+-----+---+
|id |operation|value|mx |
+---+---------+-----+---+
|1  |null     |1    |2  |
|1  |null     |2    |2  |
|1  |max      |0    |2  |
|3  |null     |1    |1  |
|3  |null     |1    |1  |
|3  |list     |0    |1  |
|2  |null     |1    |3  |
|2  |null     |3    |3  |
|2  |max      |0    |3  |
|0  |null     |1    |1  |
|0  |discard  |0    |1  |
+---+---------+-----+---+


scala> val df3 = df2.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => 0 case "max" => 1 case "list" => 2 } ; (0 until s).map( i => (r.getInt(0),null,r.getInt(3) )) }).toDF("id","operation","value")
df3: org.apache.spark.sql.DataFrame = [id: int, operation: null ... 1 more field]


scala> df3.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|1  |null     |2    |
|3  |null     |1    |
|3  |null     |1    |
|2  |null     |3    |
+---+---------+-----+


scala>
...