Pyspark: сшивание нескольких строк событий в windows - PullRequest
0 голосов
/ 06 мая 2020

Я пытаюсь сшить несколько строк событий в кадре данных вместе на основе разницы во времени между ними. Я создал новый столбец в кадре данных, который представляет разницу во времени с предыдущей строкой с использованием задержки. Фрейм данных выглядит следующим образом:

sc=spark.sparkContext
df = spark.createDataFrame(
    sc.parallelize(
        [['x',1, "9999"], ['x',2, "120"], ['x',3, "102"], ['x',4, "3000"],['x',5, "299"],['x',6, "100"]]
    ), 
    ['id',"row_number", "time_diff"]
)

Я хочу сшить строки, если time_diff с предыдущим событием меньше 160. Для этого я планировал присвоить новые номера строк всем событиям, которые находятся в пределах 160 раз друг от друга, а затем возьмите groupby на новом номере строки

Для приведенного выше фрейма данных мне нужен вывод как:

   +------+----------+---------+--------------+
    |id.   |row_number|time_diff|new_row_number|
    +------+----------+---------+--------------+
    |     x|  1       |     9999|             1|
    |     x|  2       |      120|             1|
    |     x|  3       |      102|             1|
    |     x|  4       |     3000|             4|
    |     x|  5       |      299|             5|
    |     x|  6       |      100|             5|
    +------+----------+---------+--------------+

Я написал следующую программу:

from pyspark.sql.functions import when,col

window = Window.partitionBy('id').orderBy('row_number')

df2=df.withColumn('new_row_number', col('id'))
df3=df2.withColumn('new_row_number', when(col('time_diff')>=160, col('id'))\
                       .otherwise(f.lag(col('new_row_number')).over(window)))

, но результат был следующим:

+------+----------+---------+--------------+
|id.   |row_number|time_diff|new_row_number|
+------+----------+---------+--------------+
|     x|  1       |     9999|             1|
|     x|  2       |      120|             1|
|     x|  3       |      102|             2|
|     x|  4       |     3000|             4|
|     x|  5       |      299|             5|
|     x|  6       |      100|             5|
+------+----------+---------+--------------+

Может ли кто-нибудь помочь мне решить эту проблему? Спасибо

1 Ответ

1 голос
/ 06 мая 2020

Итак, вы хотите, чтобы предыдущее значение столбца, которое в настоящее время заполняется, невозможно, поэтому для этого мы можем сделать следующее:

window = Window.partitionBy('id').orderBy('row_number')
df3=df.withColumn('new_row_number', f.when(f.col('time_diff')>=160, f.col('row_number')))\
      .withColumn("new_row_number", f.last(f.col("new_row_number"), ignorenulls=True).over(window))

+---+----------+---------+--------------+
| id|row_number|time_diff|new_row_number|
+---+----------+---------+--------------+
|  x|         1|     9999|             1|
|  x|         2|      120|             1|
|  x|         3|      102|             1|
|  x|         4|     3000|             4|
|  x|         5|      299|             5|
|  x|         6|      100|             5|
+---+----------+---------+--------------+

Чтобы объяснить:

Сначала мы генерируем значение строки для каждой строки, которая больше 160, иначе null

df2=df.withColumn('new_row_number', f.when(f.col('time_diff')>=160, f.col('row_number')))
df2.show()

+---+----------+---------+--------------+
| id|row_number|time_diff|new_row_number|
+---+----------+---------+--------------+
|  x|         1|     9999|             1|
|  x|         2|      120|          null|
|  x|         3|      102|          null|
|  x|         4|     3000|             4|
|  x|         5|      299|             5|
|  x|         6|      100|          null|
+---+----------+---------+--------------+

Затем мы заполняем фрейм данных последним значением, используя это

df3=df2.withColumn("new_row_number", f.last(f.col("new_row_number"), ignorenulls=True).over(window))
df3.show()

+---+----------+---------+--------------+
| id|row_number|time_diff|new_row_number|
+---+----------+---------+--------------+
|  x|         1|     9999|             1|
|  x|         2|      120|             1|
|  x|         3|      102|             1|
|  x|         4|     3000|             4|
|  x|         5|      299|             5|
|  x|         6|      100|             5|
+---+----------+---------+--------------+

Надеюсь, это решит ваш вопрос.

...