Превратить несколько строк событий с временными метками в кадре данных в одну строку с начальной и конечной датой-временем - PullRequest
0 голосов
/ 25 сентября 2018

У меня есть строки для одного устройства, и я хотел бы сгруппировать все те же события, которые происходят в последовательности.

Я также хотел бы сделать это с pyspark

Итак, учитывая следующее:

+--------------------+-------+
|      datetime      | event |
+--------------------+-------+
| 12-02-18T08:20:00  |     1 |
| 12-02-18T08:25:00  |     1 |
| 12-02-18T08:30:00  |     1 |
| 12-02-18T09:00:00  |     2 |
| 12-02-18T09:05:00  |     2 |
| 12-02-18T09:10:00  |     1 |
| 12-02-18T09:15:00  |     1 |
+--------------------+-------+

Я хотел бы в итоге следующее:

+-------------------+-------------------+-------+
|    start_time     |     end_time      | event |
+-------------------+-------------------+-------+
| 12-02-18T08:20:00 | 12-02-18T09:00:00 |     1 |
| 12-02-18T09:00:00 | 12-02-18T09:10:00 |     2 |
| 12-02-18T09:10:00 | null              |     1 |
+-------------------+-------------------+-------+

Не будет перекрывающихся событий, поэтому их не нужно рассматривать.Я учил делать это с UDF, но мне было интересно, если кто-нибудь знает о более элегантном / эффективном способе.

Ответы [ 2 ]

0 голосов
/ 26 сентября 2018

С подходом, предоставленным Florian (оконная функция), можно сделать, взяв строки с измененными событиями, а затем взять следующую измененную дату в Scala:

val df = List(
  ("12-02-18T08:20:00", 1),
  ("12-02-18T08:25:00", 1),
  ("12-02-18T08:30:00", 1),
  ("12-02-18T09:00:00", 2),
  ("12-02-18T09:05:00", 2),
  ("12-02-18T09:10:00", 1),
  ("12-02-18T09:15:00", 1)
).toDF("datetime", "event")
df.show(false)

val w = Window.orderBy("datetime")
val changedRowsOnlyDF = df.withColumn("changed", $"event" =!= lag($"event", 1, 0).over(w))
  .where($"changed")

val result = changedRowsOnlyDF
  .withColumn("end_time", lead($"datetime", 1).over(w))
  .drop("changed")
  .withColumnRenamed("datetime", "start_time")
result.show(false)

Вывод:

+-----------------+-----+-----------------+
|start_time       |event|end_time         |
+-----------------+-----+-----------------+
|12-02-18T08:20:00|1    |12-02-18T09:00:00|
|12-02-18T09:00:00|2    |12-02-18T09:10:00|
|12-02-18T09:10:00|1    |null             |
+-----------------+-----+-----------------+

Отказ от ответственности : такой подход может использоваться для небольших объемов данных, Spark уведомляется сообщением:

WARN org.apache.spark.sql.execution.window.WindowExec: Не определен раздел для работы с окном!Перемещение всех данных в один раздел может привести к серьезному снижению производительности.

0 голосов
/ 26 сентября 2018

Вместо использования UDF вы можете использовать Window, чтобы найти переходы в событиях и создать новый столбец из этого для использования в группировке (см. этот ответ ).Затем мы можем объединиться, чтобы найти минимальное и максимальное количество раз за событие.Рабочий пример приведен ниже, надеюсь, это поможет!

import pyspark.sql.functions as F
from pyspark.sql import Window

# SAMPLE DATA
df = spark.sparkContext.parallelize([
    ('2018-07-20T01:00:00.000Z','1'),
    ('2018-07-20T02:00:00.000Z','1'),
    ('2018-07-20T03:00:00.000Z','2'),
    ('2018-07-20T04:00:00.000Z','2'),
    ('2018-07-20T05:00:00.000Z','1')
]).toDF(("datetime","event" ))

# CALCULATE START AND END TIMES
w = Window.orderBy('datetime')
df_result = (df
    .withColumn("changed", (F.col('event') != F.lag('event', 1, 0).over(w)).cast('int'))
    .withColumn("group_id", F.sum("changed").over(w)).drop("changed")
    .groupBy('group_id','event').agg(
    F.min('datetime').alias('start'),
    F.max('datetime').alias('end'))
    .drop('group_id'))

df_result.show()

Вывод:

+-----+--------------------+--------------------+
|event|               start|                 end|
+-----+--------------------+--------------------+
|    1|2018-07-20T01:00:...|2018-07-20T02:00:...|
|    2|2018-07-20T03:00:...|2018-07-20T04:00:...|
|    1|2018-07-20T05:00:...|2018-07-20T05:00:...|
+-----+--------------------+--------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...