Для копирования DF:
val df = sc.parallelize(List(("ABCDEF","MNOPQRST"),
("123455","UVWXYZ"),
("ABCDEF","MNOPQRST"),
("123455","UVWXYZ"),
("123455","UVWXYZ"),
("ABCDEF","EFGHIJK")))
.toDF("ColumnA","ColumnB")
В предоставленной информации есть некоторые противоречия, например, ваша оконная реализация делает невозможным применение упомянутых условий.
Есть несколько важных моментов для оконной аналитики, когда речь идет о работе на основе порядка строк [ранга и сравнения с предыдущей строкой]
Вам необходимо определить соответствующие столбцы разделов. Если окно разделено на columnA
и columnB
, то их значения останутся такими же для данного окна. Таким образом, если необходимо сравнить columnA
и columnB
между строками lead
или lag
, тогда DF необходимо разделить на какой-нибудь другой столбец. Пример, показывающий, почему это проблема
val w = Window.partitionBy("ColumnA", "ColumnB").orderBy("ColumnA", "ColumnB");
df.withColumn("rank", rank.over(w)).show
+-------+--------+----+
|ColumnA| ColumnB|rank|
+-------+--------+----+
| ABCDEF| EFGHIJK| 1|
| ABCDEF|MNOPQRST| 1|
| ABCDEF|MNOPQRST| 1|
| 123455| UVWXYZ| 1|
| 123455| UVWXYZ| 1|
| 123455| UVWXYZ| 1|
+-------+--------+----+
Каждая строка теперь действует как отдельное окно. Обратите внимание на порядок, это объясняется в пункте 2.
Существует также необходимость в конкретном order by
утверждении в оконном процессе. Без этого rank
, «отставание», «опережение» и т. Д. Становятся недетерминированными и, следовательно, не имеют большого смысла. Spark пытается защититься от этого, и оконные функции будут генерировать исключение, если нет предложения order by. Пример, показывающий, почему это проблема
val w = Window.partitionBy("ColumnA", "ColumnB")
df.withColumn("result", lag("columnB", 1).over(w))
Приводит к:
org.apache.spark.sql.AnalysisException: Window function lag('columnB, 1, null) requires window to be ordered, please add ORDER BY clause. For example SELECT lag('columnB, 1, null)(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
Решение
Чтобы ответить на сам вопрос: я собираюсь рассмотреть еще два столбца для вашего вопроса.
val df = sc.parallelize(List(("ABCDEF","MNOPQRST", "P1", "1"),
("123455","UVWXYZ", "P1", "2"),
("ABCDEF","MNOPQRST", "P1", "3"),
("123455","UVWXYZ", "P1", "4"),
("123455","UVWXYZ", "P1", "5"),
("BLABLAH","UVWXYZ", "P1", "6"),
("ABCDEF","EFGHIJK", "P1", "7")))
.toDF("ColumnA","ColumnB", "ColumnP", "ColumnO")
+-------+--------+-------+-------+
|ColumnA| ColumnB|ColumnP|ColumnO|
+-------+--------+-------+-------+
| ABCDEF|MNOPQRST| P1| 1|
| 123455| UVWXYZ| P1| 2|
| ABCDEF|MNOPQRST| P1| 3|
| 123455| UVWXYZ| P1| 4|
| 123455| UVWXYZ| P1| 5|
|BLABLAH| UVWXYZ| P1| 5|
| ABCDEF| EFGHIJK| P1| 6|
+-------+--------+-------+-------+
Здесь столбец разделения - columnP
, а порядок по столбцу - ColumnO
val w = Window.partitionBy("ColumnP").orderBy("ColumnO")
val dfWithWindowing = df.withColumn("lag_columnB", lag("columnB", 1).over(w))
.withColumn("rank", rank().over(w))
dfWithWindowing.show
+-------+--------+-------+-------+-----------+----+
|ColumnA| ColumnB|ColumnP|ColumnO|lag_columnB|rank|
+-------+--------+-------+-------+-----------+----+
| ABCDEF|MNOPQRST| P1| 1| null| 1|
| 123455| UVWXYZ| P1| 2| MNOPQRST| 2|
| ABCDEF|MNOPQRST| P1| 3| UVWXYZ| 3|
| 123455| UVWXYZ| P1| 4| MNOPQRST| 4|
| 123455| UVWXYZ| P1| 5| UVWXYZ| 5|
|BLABLAH| UVWXYZ| P1| 6| UVWXYZ| 6|
| ABCDEF| EFGHIJK| P1| 7| UVWXYZ| 7|
+-------+--------+-------+-------+-----------+----+
Теперь у нас есть вся информация, необходимая для выполнения необходимых вычислений. В правиле нет значения для значения результата, если оно не соответствует ни одному из условий, реализация считает это истинным.
val resultDF = dfWithWindowing.withColumn("result", when($"rank"==="1",true).otherwise(
when($"ColumnA"==="123455", false).otherwise(
when($"ColumnB"===$"lag_columnB", true).otherwise(true)
)
)
).drop("ColumnP", "ColumnO","lag_columnB","rank")
+-------+--------+------+
|ColumnA| ColumnB|result|
+-------+--------+------+
| ABCDEF|MNOPQRST| true|
| 123455| UVWXYZ| false|
| ABCDEF|MNOPQRST| true|
| 123455| UVWXYZ| false|
| 123455| UVWXYZ| false|
|BLABLAH| UVWXYZ| true|
| ABCDEF| EFGHIJK| true|
+-------+--------+------+
Чтобы узнать больше о работе с окнами, пожалуйста, обратитесь к https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html