Операция, которую вы хотите выполнить, - это упорядочение в группе данных (здесь сгруппировано по столбцу 1). Это идеальный вариант использования оконной функции , которая выполняет вычисление для группы записей (окна).
Здесь мы можем разбить окно на столбец 1 и выбрать максимум даты для каждого такое окно. Давайте определим windowedPartition как:
val windowedPartition = Window.partitionBy("col1").orderBy(col("date").desc)
Затем мы можем применить эту оконную функцию к нашему набору данных, чтобы выбрать строку с самым высоким рангом. (Я не добавил логи фильтрации c в приведенном ниже коде, так как я думаю, что здесь нет сложности и это не повлияет на решение)
Рабочий код:
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> val data = Seq(("a" , 1, 2006, 5), ("a", 5, 2018, 2), ("a", 3, 2000, 3), ("b", 13, 2007, 4)).toDF("col1", "col2", "date", "col4")
data: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 2 more fields]
scala> data.show
+----+----+----+----+
|col1|col2|date|col4|
+----+----+----+----+
| a| 1|2006| 5|
| a| 5|2018| 2|
| a| 3|2000| 3|
| b| 13|2007| 4|
+----+----+----+----+
scala> val windowedPartition = Window.partitionBy("col1").orderBy(col("date").desc)
windowedPartition: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@39613474
scala> data.withColumn("row_number", row_number().over(windowedPartition)).show
+----+----+----+----+----------+
|col1|col2|date|col4|row_number|
+----+----+----+----+----------+
| b| 13|2007| 4| 1|
| a| 5|2018| 2| 1|
| a| 1|2006| 5| 2|
| a| 3|2000| 3| 3|
+----+----+----+----+----------+
scala> data.withColumn("row_number", row_number().over(windowedPartition)).where(col("row_number") === 1).show
+----+----+----+----+----------+
|col1|col2|date|col4|row_number|
+----+----+----+----+----------+
| b| 13|2007| 4| 1|
| a| 5|2018| 2| 1|
+----+----+----+----+----------+
scala> data.withColumn("row_number", row_number().over(windowedPartition)).where(col("row_number") === 1).drop(col("row_number")).show
+----+----+----+----+
|col1|col2|date|col4|
+----+----+----+----+
| b| 13|2007| 4|
| a| 5|2018| 2|
+----+----+----+----+
I Полагаю, что это будет более масштабируемым решением, чем struct, поскольку, если число столбцов увеличивается, нам, возможно, придется добавить эти столбцы и в struct, и в этом решении этот случай будет решен.
Хотя один вопрос - в вашем o / p значение в col2 должно быть 5 (для col1 = A), верно? Как значение col2 меняется на 1?