Spark SQL: отставание оконной функции до выполнения условия - PullRequest
0 голосов
/ 09 апреля 2019

Я обрабатываю этот набор данных в spark:

+------------+------------+------------+
|     ColumnA|     ColumnB|     Result |
+------------+------------+------------+
|      ABCDEF|    MNOPQRST|      true  |
|      123455|      UVWXYZ|      false |
|      ABCDEF|    MNOPQRST|      false | (should be true)
|      123455|      UVWXYZ|      false |
|      123455|      UVWXYZ|      false |
|      ABCDEF|    EFGHIJK |      false |
+------------+------------+------------+

Правила таковы:

  1. Если для данного раздела задан ранг 1, установите Result как true.
  2. если ранг не 1 и ColumnA значение 123455 установить Result значение как false
  3. если ранг не 1 и ColumnA значение не 123455 и если значение ColumnB совпадает со значением ColumnB в предыдущей строке, установите для Result значение true.Убедитесь, что значение ColumnA предыдущей строки не равно 123455

    WindowSpec w = Window.partitionBy ("ColumnA, ColumnB");

    Column matchColumnB = functions.col ("ColumnB").equalTo (functions.lag ("ColumnB", 1) .over (w));

Здесь оконная функция проверяет предыдущую строку без учета значения ColumnA предыдущей строки.

Например, в приведенном выше наборе данных значение ColumnB строки 3 следует сравнивать с Row1, а не с Row2.

Я пытался посмотреть Window.unboundedPreceding, но не уверен, как его использовать в этом сценарии.

Есть ли способ достичь этого?

1 Ответ

1 голос
/ 09 апреля 2019

Для копирования DF:

val df = sc.parallelize(List(("ABCDEF","MNOPQRST"), 
                    ("123455","UVWXYZ"),
                    ("ABCDEF","MNOPQRST"),
                    ("123455","UVWXYZ"),
                    ("123455","UVWXYZ"), 
                    ("ABCDEF","EFGHIJK")))
   .toDF("ColumnA","ColumnB")

В предоставленной информации есть некоторые противоречия, например, ваша оконная реализация делает невозможным применение упомянутых условий.

Есть несколько важных моментов для оконной аналитики, когда речь идет о работе на основе порядка строк [ранга и сравнения с предыдущей строкой]

  1. Вам необходимо определить соответствующие столбцы разделов. Если окно разделено на columnA и columnB, то их значения останутся такими же для данного окна. Таким образом, если необходимо сравнить columnA и columnB между строками lead или lag, тогда DF необходимо разделить на какой-нибудь другой столбец. Пример, показывающий, почему это проблема

    val w = Window.partitionBy("ColumnA", "ColumnB").orderBy("ColumnA", "ColumnB");
    df.withColumn("rank", rank.over(w)).show
    +-------+--------+----+
    |ColumnA| ColumnB|rank|
    +-------+--------+----+
    | ABCDEF| EFGHIJK|   1|
    | ABCDEF|MNOPQRST|   1|
    | ABCDEF|MNOPQRST|   1|
    | 123455|  UVWXYZ|   1|
    | 123455|  UVWXYZ|   1|
    | 123455|  UVWXYZ|   1|
    +-------+--------+----+
    

    Каждая строка теперь действует как отдельное окно. Обратите внимание на порядок, это объясняется в пункте 2.

  2. Существует также необходимость в конкретном order by утверждении в оконном процессе. Без этого rank, «отставание», «опережение» и т. Д. Становятся недетерминированными и, следовательно, не имеют большого смысла. Spark пытается защититься от этого, и оконные функции будут генерировать исключение, если нет предложения order by. Пример, показывающий, почему это проблема

    val w = Window.partitionBy("ColumnA", "ColumnB")
    df.withColumn("result", lag("columnB", 1).over(w))
    

    Приводит к:

    org.apache.spark.sql.AnalysisException: Window function lag('columnB, 1, null) requires window to be ordered, please add ORDER BY clause. For example SELECT lag('columnB, 1, null)(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
    

Решение Чтобы ответить на сам вопрос: я собираюсь рассмотреть еще два столбца для вашего вопроса.

val df = sc.parallelize(List(("ABCDEF","MNOPQRST", "P1", "1"), 
                        ("123455","UVWXYZ", "P1", "2"),
                        ("ABCDEF","MNOPQRST", "P1", "3"),
                        ("123455","UVWXYZ", "P1", "4"),
                        ("123455","UVWXYZ", "P1", "5"), 
                        ("BLABLAH","UVWXYZ", "P1", "6"),
                        ("ABCDEF","EFGHIJK", "P1", "7")))
       .toDF("ColumnA","ColumnB", "ColumnP", "ColumnO")
+-------+--------+-------+-------+
|ColumnA| ColumnB|ColumnP|ColumnO|
+-------+--------+-------+-------+
| ABCDEF|MNOPQRST|     P1|      1|
| 123455|  UVWXYZ|     P1|      2|
| ABCDEF|MNOPQRST|     P1|      3|
| 123455|  UVWXYZ|     P1|      4|
| 123455|  UVWXYZ|     P1|      5|
|BLABLAH|  UVWXYZ|     P1|      5|
| ABCDEF| EFGHIJK|     P1|      6|
+-------+--------+-------+-------+

Здесь столбец разделения - columnP, а порядок по столбцу - ColumnO

val w = Window.partitionBy("ColumnP").orderBy("ColumnO")
val dfWithWindowing = df.withColumn("lag_columnB", lag("columnB", 1).over(w))
                        .withColumn("rank", rank().over(w))
dfWithWindowing.show
+-------+--------+-------+-------+-----------+----+
|ColumnA| ColumnB|ColumnP|ColumnO|lag_columnB|rank|
+-------+--------+-------+-------+-----------+----+
| ABCDEF|MNOPQRST|     P1|      1|       null|   1|
| 123455|  UVWXYZ|     P1|      2|   MNOPQRST|   2|
| ABCDEF|MNOPQRST|     P1|      3|     UVWXYZ|   3|
| 123455|  UVWXYZ|     P1|      4|   MNOPQRST|   4|
| 123455|  UVWXYZ|     P1|      5|     UVWXYZ|   5|
|BLABLAH|  UVWXYZ|     P1|      6|     UVWXYZ|   6|
| ABCDEF| EFGHIJK|     P1|      7|     UVWXYZ|   7|
+-------+--------+-------+-------+-----------+----+

Теперь у нас есть вся информация, необходимая для выполнения необходимых вычислений. В правиле нет значения для значения результата, если оно не соответствует ни одному из условий, реализация считает это истинным.

val resultDF = dfWithWindowing.withColumn("result", when($"rank"==="1",true).otherwise(
                              when($"ColumnA"==="123455", false).otherwise(
                                    when($"ColumnB"===$"lag_columnB", true).otherwise(true)
                                 )
                              )
                          ).drop("ColumnP", "ColumnO","lag_columnB","rank")
+-------+--------+------+
|ColumnA| ColumnB|result|
+-------+--------+------+
| ABCDEF|MNOPQRST|  true|
| 123455|  UVWXYZ| false|
| ABCDEF|MNOPQRST|  true|
| 123455|  UVWXYZ| false|
| 123455|  UVWXYZ| false|
|BLABLAH|  UVWXYZ|  true|
| ABCDEF| EFGHIJK|  true|
+-------+--------+------+

Чтобы узнать больше о работе с окнами, пожалуйста, обратитесь к https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

...