Spark: рассчитать время окончания события с 30-минутными интервалами на основе значений времени начала и продолжительности в предыдущих строках. - PullRequest
2 голосов
/ 16 апреля 2019

У меня есть файл с полем event_time, каждая запись генерируется каждые 30 минут и указывает, сколько секунд длилось событие.Пример:

Event_time | event_duration_seconds
09:00      | 800
09:30      | 1800
10:00      | 2700
12:00      | 1000
13:00      | 1000

Мне нужно преобразовать последовательные события только в одно с его продолжительностью.Выходной файл должен выглядеть следующим образом:

Event_time_start | event_time_end | event_duration_seconds
09:00            | 11:00          | 5300
12:00            | 12:30          | 1000
13:00            | 13:30          | 1000

Есть ли в Scala Spark метод для сравнения записи в фрейме данных со следующей?

Я пытался с foreach цикл, но это не очень хороший вариант, так как это большой объем данных для обработки

1 Ответ

1 голос
/ 16 апреля 2019

Не тривиальная проблема, но вот решение с шагами следующим образом:

  1. Создание UDF для вычисления следующего ближайшего 30-минутного времени окончания события event_ts_end с использованием java.time API
  2. Использовать оконную функцию lag для времени события из предыдущего ряда
  3. Используйте when/otherwise для генерации столбца event_ts_start со значением null, если разница во времени события из предыдущей строки составляет 30 минут
  4. Используйте оконную функцию last(event_ts_start, ignoreNulls=true) для обратной засыпки null с последним event_ts_start значением
  5. Группировать данные по event_ts_start для агрегирования event_duration и event_ts_end

Сначала давайте соберем примерный набор данных:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val df = Seq(
  (101, "2019-04-01 09:00", 800),
  (101, "2019-04-01 09:30", 1800),
  (101, "2019-04-01 10:00", 2700),
  (101, "2019-04-01 12:00", 1000),
  (101, "2019-04-01 13:00", 1000),
  (220, "2019-04-02 10:00", 1500),
  (220, "2019-04-02 10:30", 2400)
).toDF("event_id", "event_time", "event_duration")

Обратите внимание, что примерный набор данных был немного обобщен, чтобы включать более одного события, и сделать время события включающим date информацию, чтобы охватить случаи, когда событие пересекает данную дату.

Шаг 1:

import java.sql.Timestamp

def get_next_closest(seconds: Int) = udf{ (ts: Timestamp, duration: Int) =>
  import java.time.LocalDateTime
  import java.time.format.DateTimeFormatter

  val iter = Iterator.iterate(ts.toLocalDateTime)(_.plusSeconds(seconds)).
    dropWhile(_.isBefore(ts.toLocalDateTime.plusSeconds(duration)))

  iter.next.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
}

Шаги 2 - 5:

val winSpec = Window.partitionBy("event_id").orderBy("event_time")

val seconds = 30 * 60

df.
  withColumn("event_ts", to_timestamp($"event_time", "yyyy-MM-dd HH:mm")).
  withColumn("event_ts_end", get_next_closest(seconds)($"event_ts", $"event_duration")).
  withColumn("prev_event_ts", lag($"event_ts", 1).over(winSpec)).
  withColumn("event_ts_start",  when($"prev_event_ts".isNull ||
    unix_timestamp($"event_ts") - unix_timestamp($"prev_event_ts") =!= seconds, $"event_ts"
  )).
  withColumn("event_ts_start", last($"event_ts_start", ignoreNulls=true).over(winSpec)).
  groupBy($"event_id", $"event_ts_start").agg(
    sum($"event_duration").as("event_duration"), max($"event_ts_end").as("event_ts_end")
  ).show
// +--------+-------------------+--------------+-------------------+
// |event_id|     event_ts_start|event_duration|       event_ts_end|
// +--------+-------------------+--------------+-------------------+
// |     101|2019-04-01 09:00:00|          5300|2019-04-01 11:00:00|
// |     101|2019-04-01 12:00:00|          1000|2019-04-01 12:30:00|
// |     101|2019-04-01 13:00:00|          1000|2019-04-01 13:30:00|
// |     220|2019-04-02 10:00:00|          3900|2019-04-02 11:30:00|
// +--------+-------------------+--------------+-------------------+
...