Я пытаюсь сшить несколько строк событий в кадре данных вместе на основе разницы во времени между ними. Я создал новый столбец в кадре данных, который представляет разницу во времени с предыдущей строкой с использованием задержки. Фрейм данных выглядит следующим образом:
sc=spark.sparkContext
df = spark.createDataFrame(
sc.parallelize(
[['x',1, "9999"], ['x',2, "120"], ['x',3, "102"], ['x',4, "3000"],['x',5, "299"],['x',6, "100"]]
),
['id',"row_number", "time_diff"]
)
Я хочу сшить строки, если time_diff с предыдущим событием меньше 160. Для этого я планировал присвоить новые номера строк всем событиям, которые находятся в пределах 160 раз друг от друга, а затем возьмите groupby на новом номере строки
Для приведенного выше фрейма данных мне нужен вывод как:
+------+----------+---------+--------------+
|id. |row_number|time_diff|new_row_number|
+------+----------+---------+--------------+
| x| 1 | 9999| 1|
| x| 2 | 120| 1|
| x| 3 | 102| 1|
| x| 4 | 3000| 4|
| x| 5 | 299| 5|
| x| 6 | 100| 5|
+------+----------+---------+--------------+
Я написал следующую программу:
from pyspark.sql.functions import when,col
window = Window.partitionBy('id').orderBy('row_number')
df2=df.withColumn('new_row_number', col('id'))
df3=df2.withColumn('new_row_number', when(col('time_diff')>=160, col('id'))\
.otherwise(f.lag(col('new_row_number')).over(window)))
, но результат был следующим:
+------+----------+---------+--------------+
|id. |row_number|time_diff|new_row_number|
+------+----------+---------+--------------+
| x| 1 | 9999| 1|
| x| 2 | 120| 1|
| x| 3 | 102| 2|
| x| 4 | 3000| 4|
| x| 5 | 299| 5|
| x| 6 | 100| 5|
+------+----------+---------+--------------+
Может ли кто-нибудь помочь мне решить эту проблему? Спасибо