Spark SQL: получить значение столбца, когда другой столбец является максимальным значением внутри groupBy (). Agg () - PullRequest
0 голосов
/ 02 мая 2020

У меня есть кадр данных, который выглядит следующим образом:

root
 |-- value: int (nullable = true)
 |-- date: date (nullable = true)

Я хотел бы вернуть значение, где значение - самая последняя дата в кадре данных. Меняется ли эта проблема, если мне нужно сделать groupBy и agg? Моя настоящая проблема выглядит так:

val result = df
.filter(df("date")>= somedate && df("date")<= some other date)
.groupBy(valueFromColumn1)
.agg(
    max(date),
    min(valueFromColumn2),
    Here I want to put valueFromColumn4 where date is max after the filter
 )

Я знаю, что могу получить эти значения, создав второй фрейм данных, а затем сделав объединение. Но я бы хотел избежать операции объединения, если это возможно.

Пример ввода:

Column 1 | Column 2 | Date | Column 4
    A         1       2006      5
    A         5       2018      2
    A         3       2000      3
    B         13      2007      4

Вывод же (фильтр: дата> = 2006, дата <= 2018): </p>

Column 1 | Column 2 | Date | Column 4
    A         1       2018      2  <- I got 2 from the first row which has the highest date
    B         13      2007      4

Ответы [ 3 ]

2 голосов
/ 02 мая 2020

вы можете использовать либо groupBy с struct:

df
  .groupBy()
  .agg(max(struct($"date",$"value")).as("latest"))
  .select($"latest.*")

или с Window:

df
  .withColumn("rnk",row_number().over(Window.orderBy($"date".desc)))
  .where($"rnk"===1).drop($"rnk")
2 голосов
/ 02 мая 2020

Решением будет использование struct для связывания значения и даты вместе. Это будет выглядеть так:

val result = df
  .filter(df("date")>= somedate && df("date")<= some other date)
  .withColumn("s", struct(df("date") as "date", df(valueFromColumn4) as "value"))
  .groupBy(valueFromColumn1)
  .agg(
     // since date is the first value of the struct,
     // this selects the tuple that maximizes date, and the associated value.
     max(col("s")) as "s", 
     min(col(valueFromColumn2)),
  )
  .withColumn("date", col("s.date"))
  .withColumn(valueFromColumn4, col("s.value"))
1 голос
/ 03 мая 2020

Операция, которую вы хотите выполнить, - это упорядочение в группе данных (здесь сгруппировано по столбцу 1). Это идеальный вариант использования оконной функции , которая выполняет вычисление для группы записей (окна).

Здесь мы можем разбить окно на столбец 1 и выбрать максимум даты для каждого такое окно. Давайте определим windowedPartition как:

val windowedPartition = Window.partitionBy("col1").orderBy(col("date").desc)

Затем мы можем применить эту оконную функцию к нашему набору данных, чтобы выбрать строку с самым высоким рангом. (Я не добавил логи фильтрации c в приведенном ниже коде, так как я думаю, что здесь нет сложности и это не повлияет на решение)

Рабочий код:

    scala> import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.expressions.Window


    scala> val data = Seq(("a" , 1, 2006, 5), ("a", 5, 2018, 2), ("a", 3, 2000, 3), ("b", 13, 2007, 4)).toDF("col1", "col2", "date", "col4")
    data: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 2 more fields]


    scala> data.show
    +----+----+----+----+
    |col1|col2|date|col4|
    +----+----+----+----+
    |   a|   1|2006|   5|
    |   a|   5|2018|   2|
    |   a|   3|2000|   3|
    |   b|  13|2007|   4|
    +----+----+----+----+      

    scala> val windowedPartition = Window.partitionBy("col1").orderBy(col("date").desc)
    windowedPartition: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@39613474

    scala> data.withColumn("row_number", row_number().over(windowedPartition)).show
    +----+----+----+----+----------+
    |col1|col2|date|col4|row_number|
    +----+----+----+----+----------+
    |   b|  13|2007|   4|         1|
    |   a|   5|2018|   2|         1|
    |   a|   1|2006|   5|         2|
    |   a|   3|2000|   3|         3|
    +----+----+----+----+----------+


    scala> data.withColumn("row_number", row_number().over(windowedPartition)).where(col("row_number") === 1).show
    +----+----+----+----+----------+
    |col1|col2|date|col4|row_number|
    +----+----+----+----+----------+
    |   b|  13|2007|   4|         1|
    |   a|   5|2018|   2|         1|
    +----+----+----+----+----------+


    scala> data.withColumn("row_number", row_number().over(windowedPartition)).where(col("row_number") === 1).drop(col("row_number")).show
    +----+----+----+----+
    |col1|col2|date|col4|
    +----+----+----+----+
    |   b|  13|2007|   4|
    |   a|   5|2018|   2|
    +----+----+----+----+

I Полагаю, что это будет более масштабируемым решением, чем struct, поскольку, если число столбцов увеличивается, нам, возможно, придется добавить эти столбцы и в struct, и в этом решении этот случай будет решен.

Хотя один вопрос - в вашем o / p значение в col2 должно быть 5 (для col1 = A), верно? Как значение col2 меняется на 1?

...