Apache Spark (Scala) Агрегация по времени с различными группами - PullRequest
0 голосов
/ 25 марта 2019

Я пытаюсь вычислить общее время, которое корабль проводит на якоре. Данные, с которыми я имею дело, носят временные ряды. На протяжении всего пути корабля из пункта А -> пункт Б он может останавливаться и начинаться несколько раз.

В принципе, для каждого идентификатора (уникальный идентификатор корабля) я хочу рассчитать общее время, проведенное на якоре (status === "ЯКОРЬ"). Для каждого «якорного» периода времени возьмите последнюю временную отметку и вычтите ее из первой временной отметки (или наоборот, я просто возьму абсолютное значение). Я могу сделать это легко, если корабль останавливается только один раз в пути (функция окна). Но у меня возникают проблемы, когда он останавливается и начинается несколько раз на протяжении всего путешествия. Может ли оконная функция справиться с этим?

Вот пример данных, с которыми я имею дело, и ожидаемый результат:

    val df = Seq(
        (123, "UNDERWAY", 0), 
        (123, "ANCHORED", 12), // first anchored (first time around)
        (123, "ANCHORED", 20), //take this timestamp and sub from previous
        (123, "UNDERWAY", 32), 
        (123, "UNDERWAY", 44), 
        (123, "ANCHORED", 50), // first anchored (second time around)
        (123, "ANCHORED", 65), 
        (123, "ANCHORED", 70), //take this timestamp and sub from previous
        (123, "ARRIVED", 79)
        ).toDF("id", "status", "time")

+---+--------+----+
|id |status  |time|
+---+--------+----+
|123|UNDERWAY|0   |
|123|ANCHORED|12  |
|123|ANCHORED|20  |
|123|UNDERWAY|32  |
|123|UNDERWAY|44  |
|123|ANCHORED|50  |
|123|ANCHORED|65  |
|123|ANCHORED|70  |
|123|ARRIVED |79  |
+---+--------+----+

// the resulting output I need is as follows (aggregation of total time spent at anchor)
// the ship spent 8 hours at anchor the first time, and then spent 
// 20 hours at anchor the second time. So total time is 28 hours
+---+-----------------+
|id |timeSpentAtAnchor|
+---+-----------------+
|123|28               |
+---+-----------------+

Каждый «сегмент» корабля находится на якоре. Я хочу рассчитать время, проведенное на якоре, а затем сложить все эти сегменты, чтобы получить общее время, проведенное на якоре.

1 Ответ

2 голосов
/ 25 марта 2019

Я новичок в Window функциях, так что, возможно, это можно сделать лучше, но вот что я придумал:

Это решение рассматривает только «this - previous», а не «last - first» в каждой «группе» статусов. Чистый эффект должен быть таким же, поскольку он все равно суммирует их вместе.

import org.apache.spark.sql.expressions.Window

val w = Window.orderBy($"time")

df.withColumn("tdiff", when($"status" === lag($"status", 1).over(w), $"time" - lag($"time", 1).over(w)))
  .where($"status" === lit("ANCHORED"))
  .groupBy("id", "status")
  .agg(sum("tdiff").as("timeSpentAtAnchor"))
  .select("id", "timeSpentAtAnchor")
  .show(false)

Что дает:

+---+-----------------+
|id |timeSpentAtAnchor|
+---+-----------------+
|123|28               |
+---+-----------------+

Ответ был сформирован с информацией из этого ответа. И, как там указано:

Примечание: , так как в этом примере не используется ни один раздел, может возникнуть проблема с производительностью, в ваших реальных данных было бы полезно, если бы ваша проблема могла быть разделена по некоторым переменным.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...