Я не могу указать строку между следующей строкой и следующей строкой после конца раздела.Поэтому я использовал текущую строку и неограниченное число следующих, затем удалил первый элемент Array, используя udf.Я использовал все - spark.sql, udf и df манипуляции .. проверить это
val df = Seq((1,"A","Happy","0","0"),(1,"B","Happy","1","0"),(1,"C","Happy","3","0"),(1,"D","Happy","5","0"),(1,"C","Happy","6","0"),(1,"D","Sad","6","0"),(1,"C","Sad","10","0"),(1,"A","Happy","28","0"),(1,"E","Happy","35","0"),(1,"E","Sad","60","0"),(2,"F","Happy","6","6"),(2,"E","Happy","17","6"),(2,"D","Happy","20","6"),(2,"D","Sad","21","6"),(2,"E","Happy","27","6"),(2,"G","Happy","37","6"),(2,"H","Happy","39","6"),(2,"G","Sad","45","6")).toDF("user_id","item_id","mood","time","sessionBegin")
val df2 = df.withColumn("time", 'time.cast("int"))
df2.createOrReplaceTempView("user")
val df3 = spark.sql(
"""
select user_id, item_id, mood, time, sessionBegin,
case when mood='Happy' then
collect_list(case when mood='Happy' then ' ' when mood='Sad' then item_id end) over(partition by user_id order by time rows between current row and unbounded following )
when mood='Sad' then array()
end as result from user
""")
def sliceResult(x:Seq[String]):Seq[String]={
val y = x.drop(1).filter( _ != " ")
y.toSet.toSeq
}
val udf_sliceResult = udf ( sliceResult(_:Seq[String]):Seq[String] )
df3.withColumn("result1", udf_sliceResult('result) ).show(false)
Результаты:
+-------+-------+-----+----+------------+------------------------------+---------+
|user_id|item_id|mood |time|sessionBegin|result |result1 |
+-------+-------+-----+----+------------+------------------------------+---------+
|1 |A |Happy|0 |0 |[ , , , , , D, C, , , E]|[D, C, E]|
|1 |B |Happy|1 |0 |[ , , , , D, C, , , E] |[D, C, E]|
|1 |C |Happy|3 |0 |[ , , , D, C, , , E] |[D, C, E]|
|1 |D |Happy|5 |0 |[ , , D, C, , , E] |[D, C, E]|
|1 |C |Happy|6 |0 |[ , D, C, , , E] |[D, C, E]|
|1 |D |Sad |6 |0 |[] |[] |
|1 |C |Sad |10 |0 |[] |[] |
|1 |A |Happy|28 |0 |[ , , E] |[E] |
|1 |E |Happy|35 |0 |[ , E] |[E] |
|1 |E |Sad |60 |0 |[] |[] |
|2 |F |Happy|6 |6 |[ , , , D, , , , G] |[D, G] |
|2 |E |Happy|17 |6 |[ , , D, , , , G] |[D, G] |
|2 |D |Happy|20 |6 |[ , D, , , , G] |[D, G] |
|2 |D |Sad |21 |6 |[] |[] |
|2 |E |Happy|27 |6 |[ , , , G] |[G] |
|2 |G |Happy|37 |6 |[ , , G] |[G] |
|2 |H |Happy|39 |6 |[ , G] |[G] |
|2 |G |Sad |45 |6 |[] |[] |
+-------+-------+-----+----+------------+------------------------------+---------+
EDIT1:
Как упоминалось в OP, '' можно заменить на null
, и сам df3 будет конечным результатом.Таким образом, udf () можно избежать
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df3 = spark.sql(
"""
select user_id, item_id, mood, time, sessionBegin,
case when mood='Happy' then
collect_list(case when mood='Happy' then null when mood='Sad' then item_id end) over(partition by user_id order by time rows between current row and unbounded following )
when mood='Sad' then array()
end as result from user
""")
// Exiting paste mode, now interpreting.
df3: org.apache.spark.sql.DataFrame = [user_id: int, item_id: string ... 4 more fields]
scala> df3.show(false)
+-------+-------+-----+----+------------+---------+
|user_id|item_id|mood |time|sessionBegin|result |
+-------+-------+-----+----+------------+---------+
|1 |A |Happy|0 |0 |[D, C, E]|
|1 |B |Happy|1 |0 |[D, C, E]|
|1 |C |Happy|3 |0 |[D, C, E]|
|1 |D |Happy|5 |0 |[D, C, E]|
|1 |C |Happy|6 |0 |[D, C, E]|
|1 |D |Sad |6 |0 |[] |
|1 |C |Sad |10 |0 |[] |
|1 |A |Happy|28 |0 |[E] |
|1 |E |Happy|35 |0 |[E] |
|1 |E |Sad |60 |0 |[] |
|2 |F |Happy|6 |6 |[D, G] |
|2 |E |Happy|17 |6 |[D, G] |
|2 |D |Happy|20 |6 |[D, G] |
|2 |D |Sad |21 |6 |[] |
|2 |E |Happy|27 |6 |[G] |
|2 |G |Happy|37 |6 |[G] |
|2 |H |Happy|39 |6 |[G] |
|2 |G |Sad |45 |6 |[] |
+-------+-------+-----+----+------------+---------+
scala>