Как объединить столбцы в предыдущей строке в фрейме данных? - PullRequest
1 голос
/ 28 мая 2020

У меня есть такой фрейм данных:

case class CC(id: String, p2: Double, p3: Double, time: Int)
val df = List(
  CC("a", 1.1d, 2.2d, 1),
  CC("b", 3.3d, 4.4d, 2),
  CC("c", 5.5d, 6.6d, 3)).toDF

+---+---+---+----+
| id| p2| p3|time|
+---+---+---+----+
|  a|1.1|2.2|   1|
|  b|3.3|4.4|   2|
|  c|5.5|6.6|   3|
+---+---+---+----+

Я хочу объединить p2 и p3 предыдущей строки и поместить в столбец p5 и объединить p2 и p3 из текущую строку и поместить в столбец p6. Чтобы получить:

    +---+---+---+----+---------+---------+
    | id| p2| p3|time| p5      | p6      |
    +---+---+---+----+---------+---------+
    |  a|1.1|2.2|   1|         |1.1: 2.2 |
    |  b|3.3|4.4|   2|1.1: 2.2 |3.3: 4.4 |
    |  c|5.5|6.6|   3|3.3: 4.4 |5.5: 6.6 |
    +---+---+---+----+---------+---------+

Для текущей строки, т.е. p6 я могу легко использовать

.withColumn("p6", concat(col("p2"), col("p3")))

, а для предыдущей строки я подумал об использовании оконной функции и lag как показано ниже, но это не работает.

val wf = Window.partitionBy("id").orderBy("time")
df.withColumn("p5", concat(lag(col("p2"), 1) + lag("p3", 1)).over(w))

Но я получаю сообщение об ошибке, что выражение concat... не поддерживается в оконной функции. Некоторые StackOverflow ответы говорят об использовании определяемой пользователем агрегатной функции, но я не смог найти простого примера, которому я мог бы следовать.

Любое объяснение этой проблемы приветствуется. Если вы знаете, предложите альтернативные методы решения этой проблемы. Спасибо!

1 Ответ

3 голосов
/ 28 мая 2020

Я приведу аналогичный пример, но не такой, как у вас ... если вы хотите применить concat к 2 столбцам с задержкой, вы можете go на 2 шага, как показано ниже ... 1) применить функции задержки 2) затем concat.

вы не можете применить concat к 2 столбцам с задержкой одновременно ...

   import org.apache.spark.sql.expressions.Window
  import org.apache.spark.sql.functions._

  var customers = spark.sparkContext.parallelize(List(("Alice", "click","item_8", 50),
    ("Alice", "view","item_2", 55),
    ("Alice", "share","item_11", 100),
    ("Bob", "view","item_11", 25),
    ("Bob", "share","ietm_2", 50),
    ("Bob", "view", "item_8",65))).toDF("name", "event", "item", "time")
  customers.show

  val wSpec3 = Window.partitionBy("name").orderBy("time")
  customers.withColumn(
    "prev_event", lag(col("event"),1).over(wSpec3)
  ).withColumn(
    "prev_item", lag(col("item"),1).over(wSpec3)
  ).withColumn(
    "prev_time", lag(col("time"),1).over(wSpec3)
  ).withColumn("newcolumn", concat( 'prev_event, 'prev_item)).show

Результат:

+-----+-----+-------+----+
| name|event|   item|time|
+-----+-----+-------+----+
|Alice|click| item_8|  50|
|Alice| view| item_2|  55|
|Alice|share|item_11| 100|
|  Bob| view|item_11|  25|
|  Bob|share| ietm_2|  50|
|  Bob| view| item_8|  65|
+-----+-----+-------+----+

+-----+-----+-------+----+----------+---------+---------+-----------+
| name|event|   item|time|prev_event|prev_item|prev_time|  newcolumn|
+-----+-----+-------+----+----------+---------+---------+-----------+
|  Bob| view|item_11|  25|      null|     null|     null|       null|
|  Bob|share| ietm_2|  50|      view|  item_11|       25|viewitem_11|
|  Bob| view| item_8|  65|     share|   ietm_2|       50|shareietm_2|
|Alice|click| item_8|  50|      null|     null|     null|       null|
|Alice| view| item_2|  55|     click|   item_8|       50|clickitem_8|
|Alice|share|item_11| 100|      view|   item_2|       55| viewitem_2|
+-----+-----+-------+----+----------+---------+---------+-----------+
...