Вы можете использовать операцию flatMap на фрейме данных и генерировать необходимые строки на основе условий, которые вы упомянули.Проверьте это
scala> val df = Seq((1,null,1),(1,"discard",0),(2,null,1),(2,null,2),(2,"max",0),(3,null,1),(3,null,1),(3,"list",0)).toDF("id","operation","value")
df: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 1 more field]
scala> df.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|1 |null |1 |
|1 |discard |0 |
|2 |null |1 |
|2 |null |2 |
|2 |max |0 |
|3 |null |1 |
|3 |null |1 |
|3 |list |0 |
+---+---------+-----+
scala> df.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => (0,0) case "max" => (1,2) case "list" => (2,1) } ; (0
until s._1).map( i => (r.getInt(0),null,s._2) ) }).show(false)
+---+----+---+
|_1 |_2 |_3 |
+---+----+---+
|2 |null|2 |
|3 |null|1 |
|3 |null|1 |
+---+----+---+
Spark назначает _1, _2 и т. Д., Чтобы вы могли сопоставить их с реальными именами, назначив их, как показано ниже
scala> val df2 = df.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => (0,0) case "max" => (1,2) case "list" => (2,1) } ; (0 until s._1).map( i => (r.getInt(0),null,s._2) ) }).toDF("id","operation","value")
df2: org.apache.spark.sql.DataFrame = [id: int, operation: null ... 1 more field]
scala> df2.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|2 |null |2 |
|3 |null |1 |
|3 |null |1 |
+---+---------+-----+
scala>
EDIT1:
Поскольку для каждого идентификатора требуется максимальное значение (значение), вы можете использовать оконные функции и получить максимальное значение в новом столбце, а затем использовать ту же технику и получить результаты.Проверьте это
scala> val df = Seq((0,null,1),(0,"discard",0),(1,null,1),(1,null,2),(1,"max",0),(2,null,1),(2,null,3),(2,"max",0),(3,null,1),(3,null,1),(3,"list",0)).toDF("id","operation","value")
df: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 1 more field]
scala> df.createOrReplaceTempView("michael")
scala> val df2 = spark.sql(""" select *, max(value) over(partition by id) mx from michael """)
df2: org.apache.spark.sql.DataFrame = [id: int, operation: string ... 2 more fields]
scala> df2.show(false)
+---+---------+-----+---+
|id |operation|value|mx |
+---+---------+-----+---+
|1 |null |1 |2 |
|1 |null |2 |2 |
|1 |max |0 |2 |
|3 |null |1 |1 |
|3 |null |1 |1 |
|3 |list |0 |1 |
|2 |null |1 |3 |
|2 |null |3 |3 |
|2 |max |0 |3 |
|0 |null |1 |1 |
|0 |discard |0 |1 |
+---+---------+-----+---+
scala> val df3 = df2.filter("operation is not null").flatMap( r=> { val x=r.getString(1); val s = x match { case "discard" => 0 case "max" => 1 case "list" => 2 } ; (0 until s).map( i => (r.getInt(0),null,r.getInt(3) )) }).toDF("id","operation","value")
df3: org.apache.spark.sql.DataFrame = [id: int, operation: null ... 1 more field]
scala> df3.show(false)
+---+---------+-----+
|id |operation|value|
+---+---------+-----+
|1 |null |2 |
|3 |null |1 |
|3 |null |1 |
|2 |null |3 |
+---+---------+-----+
scala>