Spark-Scala: фильтрация набора данных Spark на основе нескольких столбцов и условий - PullRequest
1 голос
/ 10 июля 2019

Мне сложно найти хороший способ отфильтровать набор данных искры. Я описал основную проблему ниже:

  1. Для каждого ключа проверьте, есть ли код состояния === UV .
  2. Если с этим ключом не связан код состояния UV , этот ключ полностью игнорируется.
    • Обратите внимание: Должен быть только когда-либо один УФ для каждой клавиши.
  3. Если есть, то искать ближайшее OA событие, которое после UV отметки времени.
    • Обратите внимание: После отметки времени UV может быть несколько событий OA . Мне нужна ближайшая к отметке времени UV .
  4. Если единственное событие OA находится в прошлом (то есть до UV , я все еще хочу сохранить эту запись, потому что ожидается ожидаемое OA , но Я все еще хочу захватить строку с кодом состояния OA , но заменить значение будет null

Input

+-----------+----------+-------------------+
|key        |statusCode|statusTimestamp    |
+-----------+----------+-------------------+
|AAAAAABBBBB|OA        |2019-05-24 14:46:00|
|AAAAAABBBBB|VD        |2019-05-31 19:31:00|
|AAAAAABBBBB|VA        |2019-06-26 00:00:00|
|AAAAAABBBBB|E         |2019-06-26 02:00:00|
|AAAAAABBBBB|UV        |2019-06-29 00:00:00|
|AAAAAABBBBB|OA        |2019-07-01 00:00:00|
|AAAAAABBBBB|EE        |2019-07-03 01:00:00|
+-----------+----------+-------------------+

Ожидаемый результат

+-----------+----------+-------------------+
|key        |statusCode|statusTimestamp    |
+-----------+----------+-------------------+
|AAAAAABBBBB|UV        |2019-06-29 00:00:00|
|AAAAAABBBBB|OA        |2019-07-01 00:00:00|
+-----------+----------+-------------------+

Я знаю, что, скорее всего, смогу решить проблему, настроив данные подобным образом, но есть ли у кого-нибудь предложения о том, как решить описанный выше фильтр.

someDS
  .groupBy("key")
  .pivot("statusCode", Seq("UV", "OA"))
  .agg(collect_set($"statusTimestamp"))
  .thenSomeOtherStuff...

1 Ответ

1 голос
/ 10 июля 2019

В то время как подход groupBy/pivot хорошо группировал бы временные метки, для выполнения необходимой фильтрации потребовались бы нетривиальные шаги (скорее всего, UDF) с последующим повторным расширением. Вот другой подход со следующими шагами:

  1. Фильтровать набор данных только для statusCode "УФ" или "ОА"
  2. Для каждой строки используйте оконные функции для создания строки statusCode из previous, current, and next 2 rows
  3. Используйте Regex сопоставление с образцом для определения нужных строк

Пример кода ниже:

import java.sql.Timestamp
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

// Sample data:
//   key `A`: requirement #3
//   key `B`: requirement #2
//   key `C`: requirement #4  
val df = Seq(
  ("A", "OA", Timestamp.valueOf("2019-05-20 00:00:00")),
  ("A", "E",  Timestamp.valueOf("2019-05-30 00:00:00")),
  ("A", "UV", Timestamp.valueOf("2019-06-22 00:00:00")),
  ("A", "OA", Timestamp.valueOf("2019-07-01 00:00:00")),
  ("A", "OA", Timestamp.valueOf("2019-07-03 00:00:00")),
  ("B", "C",  Timestamp.valueOf("2019-06-15 00:00:00")),
  ("B", "OA", Timestamp.valueOf("2019-06-25 00:00:00")),
  ("C", "D",  Timestamp.valueOf("2019-06-01 00:00:00")),
  ("C", "OA", Timestamp.valueOf("2019-06-30 00:00:00")),
  ("C", "UV", Timestamp.valueOf("2019-07-02 00:00:00"))
).toDF("key", "statusCode", "statusTimestamp")

val win = Window.partitionBy("key").orderBy("statusTimestamp")

val df2 = df.
  where($"statusCode" === "UV" || $"statusCode" === "OA").
  withColumn("statusPrevCurrNext2", concat(
    coalesce(lag($"statusCode", 1).over(win), lit("")),
    lit("#"),
    $"statusCode",
    lit("#"),
    coalesce(lead($"statusCode", 1).over(win), lit("")),
    lit("#"),
    coalesce(lead($"statusCode", 2).over(win), lit(""))
  ))

Давайте посмотрим на df2 (результат шагов 1 и 2):

df2.show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp    |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |B  |OA        |2019-06-25 00:00:00|#OA##              |
// |C  |OA        |2019-06-30 00:00:00|#OA#UV#            | <-- Req #4: Ends with `#UV#`
// |C  |UV        |2019-07-02 00:00:00|OA#UV##            | <-- Req #4: Ends with `#UV##`
// |A  |OA        |2019-05-20 00:00:00|#OA#UV#OA          |
// |A  |UV        |2019-06-22 00:00:00|OA#UV#OA#OA        | <-- Req #3: Starts with `[^#]*#UV#`
// |A  |OA        |2019-07-01 00:00:00|UV#OA#OA#          | <-- Req #3: starts with `UV#`
// |A  |OA        |2019-07-03 00:00:00|OA#OA##            |
// +---+----------+-------------------+-------------------+

Применение шага 3:

df2.
  where($"statusPrevCurrNext2".rlike("^[^#]*#UV#.*|^UV#.*|.*#UV#+$")).
  orderBy("key", "statusTimestamp").
  show(false)
// +---+----------+-------------------+-------------------+
// |key|statusCode|statusTimestamp    |statusPrevCurrNext2|
// +---+----------+-------------------+-------------------+
// |A  |UV        |2019-06-22 00:00:00|OA#UV#OA#OA        |
// |A  |OA        |2019-07-01 00:00:00|UV#OA#OA#          |
// |C  |OA        |2019-06-30 00:00:00|#OA#UV#            |
// |C  |UV        |2019-07-02 00:00:00|OA#UV##            |
// +---+----------+-------------------+-------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...