Scala Spark Dataframe Создать новый столбец с максимумом предыдущего и текущего значения другого столбца - PullRequest
0 голосов
/ 06 мая 2020

У меня есть фрейм данных только с столбцом category и столбцом A, как показано ниже. Я хочу заполнить столбец B таким образом, чтобы он сравнивал текущее значение A и предыдущее значение B и сохранял максимальное значение для каждой категории. Пробовал с функцией Windows, лаги, макс категории и т.д. c. но самая большая проблема, с которой я столкнулся, - это как запомнить предыдущий максимум при сравнении двух значений.

 +---+--------+--+--+
 id |  category | A | B |
 +---+--------+--+--+
  1  Fruit   1   1
  2  Fruit   5   5
  3  Fruit   3   5 
  4  Fruit   4   5 
  1  Dessert 4   4
  2  Dessert 2   4
  1  Veggies 11  11
  2  Veggies 7   11
  3  Veggies 12  12
  4  Veggies 3   12
  ---+------+---+----+-

Ответы [ 2 ]

5 голосов
/ 06 мая 2020

Использование максимального значения A должно помочь:

df
 .withColumn("B", max($"A").over(Window.partitionBy($"category").orderBy($"id")))
2 голосов
/ 06 мая 2020

Мне было трудно выразить это с помощью Spark SQL, но я управлялся с помощью функционального программирования с использованием Dataset API

scala>   case class Food(category: String, a: Int, b: Option[Int] = None)
defined class Food

scala>     val ds = spark.createDataset(
     |       List(
     |         Food("Fruit", 1),
     |         Food("Fruit", 5),
     |         Food("Fruit", 3),
     |         Food("Fruit", 4),
     |         Food("Dessert", 4),
     |         Food("Dessert", 2),
     |         Food("Veggies", 11),
     |         Food("Veggies", 7),
     |         Food("Veggies", 12),
     |         Food("Veggies", 3)
     |       )
     |     )
ds: org.apache.spark.sql.Dataset[Food] = [category: string, a: int ... 1 more field]

scala> ds.show
+--------+---+----+
|category|  a|   b|
+--------+---+----+
|   Fruit|  1|null|
|   Fruit|  5|null|
|   Fruit|  3|null|
|   Fruit|  4|null|
| Dessert|  4|null|
| Dessert|  2|null|
| Veggies| 11|null|
| Veggies|  7|null|
| Veggies| 12|null|
| Veggies|  3|null|
+--------+---+----+


scala> :paste
// Entering paste mode (ctrl-D to finish)

    ds.groupByKey(_.category)
      .flatMapGroups { (key, iter) =>
        if (iter.hasNext) {
          val head = iter.next
          iter.scanLeft(head.copy(b = Some(head.a))) { (x, y) =>
            val a = x.b.map(b => if(x.a > b) x.a else b).getOrElse(x.a)
            y.copy(b = if(y.a > a) Some(y.a) else Some(a))
          }
        } else iter
      }
      .show

// Exiting paste mode, now interpreting.

+--------+---+---+
|category|  a|  b|
+--------+---+---+
| Veggies| 11| 11|
| Veggies|  7| 11|
| Veggies| 12| 12|
| Veggies|  3| 12|
| Dessert|  4|  4|
| Dessert|  2|  4|
|   Fruit|  1|  1|
|   Fruit|  5|  5|
|   Fruit|  3|  5|
|   Fruit|  4|  5|
+--------+---+---+
...