Извлечение временных интервалов в кадре данных Scala Spark - PullRequest
1 голос
/ 08 марта 2019

Я пытаюсь извлечь объединенные интервалы данных на основе временных рядов в scala и spark

У меня есть следующие данные в кадре данных:

Id | State | StartTime           | EndTime
---+-------+---------------------+--------------------
 1 |   R   | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
 1 |   R   | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
 1 |   R   | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
 1 |   W   | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
 1 |   W   | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
 1 |   R   | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
 1 |   R   | 2019-01-02T18:30:00 | 2019-01-02T22:45:00

Мне нужно извлечь данные во временные интервалы на основе идентификатора и состояния. Полученные данные должны выглядеть следующим образом:

Id | State | StartTime           | EndTime
---+-------+---------------------+--------------------
 1 |   R   | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
 1 |   W   | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
 1 |   R   | 2019-01-02T13:45:00 | 2019-01-02T22:45:00

Обратите внимание, что первые три записи были сгруппированы вместе, потому что оборудование непрерывно находится в состоянии R с 2019-01-01T03: 00: 00 до 2019-01-01T22: 00: 00, затем оно переключается в состояние W для следующих двух записей с 2019-01-01T22: 00: 00 до 2019-01-02T13: 45: 00 и затем обратно в состояние R для последних двух записей.

Ответы [ 2 ]

0 голосов
/ 16 марта 2019

Поскольку у меня недавно был похожий случай, я хотел бы предоставить полное решение для этого случая.Часть кода:

val df2 = df
  .select('Id,'State,'StartTime,'EndTime,
          row_number().over(idSpec).as("idRowNumber"),
          row_number().over(idStateSpec).as("idStateRowNumber"))

Имеет вывод:

+---+-----+-------------------+-------------------+-----------+----------------+
| Id|State|          StartTime|            EndTime|idRowNumber|idStateRowNumber|
+---+-----+-------------------+-------------------+-----------+----------------+
|  1|    R|2019-01-01 03:00:00|2019-01-01 11:30:00|          1|               1|
|  1|    R|2019-01-01 11:30:00|2019-01-01 15:00:00|          2|               2|
|  1|    R|2019-01-01 15:00:00|2019-01-01 22:00:00|          3|               3|
|  1|    W|2019-01-01 22:00:00|2019-01-02 04:30:00|          4|               1|
|  1|    W|2019-01-02 04:30:00|2019-01-02 13:45:00|          5|               2|
|  1|    R|2019-01-02 13:45:00|2019-01-02 18:30:00|          6|               4|
|  1|    R|2019-01-02 18:30:00|2019-01-02 22:45:00|          7|               5|
+---+-----+-------------------+-------------------+-----------+----------------+

Обратите внимание, что разница между idRowNumber и idStateRowNumber будет идентичнадля каждой комбинации (Id, State) отсюда мы можем создать новый столбец с именем category и group, чтобы получить min StartTime и max EndTime для каждой группы.Полный код должен выглядеть следующим образом:

val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)

val df2 = df
  .select('Id,'State,'StartTime.cast("timestamp"),'EndTime.cast("timestamp"),
          row_number().over(idSpec).as("idRowNumber"),
          row_number().over(idStateSpec).as("idStateRowNumber"))
  .withColumn("Category", $"idRowNumber" - $"idStateRowNumber")
  .groupBy("Category", "Id", "State")
  .agg(min("StartTime").as("StartTime"), max("EndTime").as("EndTime"))
  .drop("Category")

И вывод:

+---+-----+-------------------+-------------------+
| Id|State|          StartTime|            EndTime|
+---+-----+-------------------+-------------------+
|  1|    R|2019-01-01 03:00:00|2019-01-01 22:00:00|
|  1|    W|2019-01-01 22:00:00|2019-01-02 13:45:00|
|  1|    R|2019-01-02 13:45:00|2019-01-02 22:45:00|
+---+-----+-------------------+-------------------+
0 голосов
/ 15 марта 2019

Получается, что ответ на этот вопрос Объедините строки, когда время окончания одного - это время начала другого (Oracle) , переведенное в Spark.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col,row_number}
import spark.implicits._

val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
  .select('Id,'State,'StartTime,'EndTime,
          row_number().over(idSpec).as("idRowNumber"),
          row_number().over(idStateSpec).as("idStateRowNumber"))
  .groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
  .agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...