Другой подход SQL с использованием заданных входных данных.
Pyspark
>>> from pyspark.sql.functions import *
>>> df = sc.parallelize([("asdc24","conn",1),
... ("asdc24","conn",2),
... ("asdc24","conn",5),
... ("adsfa6","conn",1),
... ("adsfa6","conn",3),
... ("asdc24","conn",9),
... ("adsfa6","conn",5),
... ("asdc24","conn",11),
... ("adsfa6","conn",10),
... ("asdc24","conn",15)]).toDF(["user_id","action","day"])
>>> df.createOrReplaceTempView("qubix")
>>> spark.sql(" select * from qubix order by user_id, day").show()
+-------+------+---+
|user_id|action|day|
+-------+------+---+
| adsfa6| conn| 1|
| adsfa6| conn| 3|
| adsfa6| conn| 5|
| adsfa6| conn| 10|
| asdc24| conn| 1|
| asdc24| conn| 2|
| asdc24| conn| 5|
| asdc24| conn| 9|
| asdc24| conn| 11|
| asdc24| conn| 15|
+-------+------+---+
>>> spark.sql(""" with t1 (select user_id,action, day,lead(day) over(partition by user_id order by day) ld from qubix), t2 (select user_id from t1 where ld-t1.day=1 ) select * from qubix where user_id in (select user_id from t2) """).show()
+-------+------+---+
|user_id|action|day|
+-------+------+---+
| asdc24| conn| 1|
| asdc24| conn| 2|
| asdc24| conn| 5|
| asdc24| conn| 9|
| asdc24| conn| 11|
| asdc24| conn| 15|
+-------+------+---+
>>>
Версия Scala
scala> val df = Seq(("asdc24","conn",1),
| ("asdc24","conn",2),
| ("asdc24","conn",5),
| ("adsfa6","conn",1),
| ("adsfa6","conn",3),
| ("asdc24","conn",9),
| ("adsfa6","conn",5),
| ("asdc24","conn",11),
| ("adsfa6","conn",10),
| ("asdc24","conn",15)).toDF("user_id","action","day")
df: org.apache.spark.sql.DataFrame = [user_id: string, action: string ... 1 more field]
scala> df.orderBy('user_id,'day).show(false)
+-------+------+---+
|user_id|action|day|
+-------+------+---+
|adsfa6 |conn |1 |
|adsfa6 |conn |3 |
|adsfa6 |conn |5 |
|adsfa6 |conn |10 |
|asdc24 |conn |1 |
|asdc24 |conn |2 |
|asdc24 |conn |5 |
|asdc24 |conn |9 |
|asdc24 |conn |11 |
|asdc24 |conn |15 |
+-------+------+---+
scala> df.createOrReplaceTempView("qubix")
scala> spark.sql(""" with t1 (select user_id,action, day,lead(day) over(partition by user_id order by day) ld from qubix), t2 (select user_id fro
m t1 where ld-t1.day=1 ) select * from qubix where user_id in (select user_id from t2) """).show(false)
+-------+------+---+
|user_id|action|day|
+-------+------+---+
|asdc24 |conn |1 |
|asdc24 |conn |2 |
|asdc24 |conn |5 |
|asdc24 |conn |9 |
|asdc24 |conn |11 |
|asdc24 |conn |15 |
+-------+------+---+
scala>