В то время как подход groupBy/pivot
хорошо группировал бы временные метки, для выполнения необходимой фильтрации потребовались бы нетривиальные шаги (скорее всего, UDF) с последующим повторным расширением. Вот другой подход со следующими шагами:
- Фильтровать набор данных только для
statusCode
"УФ" или "ОА"
- Для каждой строки используйте оконные функции для создания строки
statusCode
из previous, current, and next 2 rows
- Используйте
Regex
сопоставление с образцом для определения нужных строк
Пример кода ниже:
import java.sql.Timestamp
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
// Sample data:
// key `A`: requirement #3
// key `B`: requirement #2
// key `C`: requirement #4
val df = Seq(
("A", "OA", Timestamp.valueOf("2019-05-20 00:00:00")),
("A", "E", Timestamp.valueOf("2019-05-30 00:00:00")),
("A", "UV", Timestamp.valueOf("2019-06-22 00:00:00")),
("A", "OA", Timestamp.valueOf("2019-07-01 00:00:00")),
("A", "OA", Timestamp.valueOf("2019-07-03 00:00:00")),
("B", "C", Timestamp.valueOf("2019-06-15 00:00:00")),
("B", "OA", Timestamp.valueOf("2019-06-25 00:00:00")),
("C", "D", Timestamp.valueOf("2019-06-01 00:00:00")),
("C", "OA", Timestamp.valueOf("2019-06-30 00:00:00")),
("C", "UV", Timestamp.valueOf("2019-07-02 00:00:00"))
).toDF("key", "statusCode", "statusTimestamp")
val win = Window.partitionBy("key").orderBy("statusTimestamp")
val df2 = df.
where($"statusCode" === "UV" || $"statusCode" === "OA").
withColumn("statusPrevCurrNext2", concat(
coalesce(lag($"statusCode", 1).over(win), lit("")),
lit("#"),
$"statusCode",
lit("#"),
coalesce(lead($"statusCode", 1).over(win), lit("")),
lit("#"),
coalesce(lead($"statusCode", 2).over(win), lit(""))
))
Давайте посмотрим на df2
(результат шагов 1
и 2
):
df2.show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |B |OA |2019-06-25 00:00:00|#OA## |
// |C |OA |2019-06-30 00:00:00|#OA#UV# | <-- Req #4: Ends with `#UV#`
// |C |UV |2019-07-02 00:00:00|OA#UV## | <-- Req #4: Ends with `#UV##`
// |A |OA |2019-05-20 00:00:00|#OA#UV#OA |
// |A |UV |2019-06-22 00:00:00|OA#UV#OA#OA | <-- Req #3: Starts with `[^#]*#UV#`
// |A |OA |2019-07-01 00:00:00|UV#OA#OA# | <-- Req #3: starts with `UV#`
// |A |OA |2019-07-03 00:00:00|OA#OA## |
// +---+----------+-------------------+-------------------+
Применение шага 3
:
df2.
where($"statusPrevCurrNext2".rlike("^[^#]*#UV#.*|^UV#.*|.*#UV#+$")).
orderBy("key", "statusTimestamp").
show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |A |UV |2019-06-22 00:00:00|OA#UV#OA#OA |
// |A |OA |2019-07-01 00:00:00|UV#OA#OA# |
// |C |OA |2019-06-30 00:00:00|#OA#UV# |
// |C |UV |2019-07-02 00:00:00|OA#UV## |
// +---+----------+-------------------+-------------------+