Spark row_number partitionBy без порядка, чтобы сохранить естественный порядок - PullRequest
0 голосов
/ 07 августа 2020

Я бы хотел реализовать разделение без какого-либо порядка по, чтобы данные могли сохранять свою естественную сортировку во фрейме данных. Пожалуйста, поделитесь любым советом, заранее спасибо.

Учтите, что во фрейме данных Spark есть следующие данные

         raw data
----------------------------
 name | item id |   action
----------------------------
 John |    120  |   sell 
----------------------------
 John |    320  |   buy
----------------------------
 Jane |    120  |   sell 
----------------------------
 Jane |    450  |   buy
----------------------------
 Sam  |    360  |   sell 
----------------------------
 Sam  |    300  |   hold
----------------------------
 Sam  |    450  |   buy
----------------------------
 Tim  |    470  |   buy
----------------------------

В этой схеме таблицы есть пара правил

1. Every one has at least one action `buy`
2. Every one's last action must be `buy` as well

Теперь я хотел бы добавить столбец последовательности, просто чтобы показать порядок действий для всех

            expectation
--------------------------------------
 name | item id |   action  |  seq   
--------------------------------------
 John |    120  |   sell    |  1
--------------------------------------
 John |    320  |   buy     |  2
--------------------------------------
 Jane |    120  |   sell    |  1
--------------------------------------
 Jane |    450  |   buy     |  2
--------------------------------------
 Sam  |    360  |   sell    |  1
--------------------------------------
 Sam  |    300  |   hold    |  2
--------------------------------------
 Sam  |    450  |   buy     |  3
--------------------------------------
 Tim  |    470  |   buy     |  1
--------------------------------------

так вот мой код

import org.apache.spark.sql.functions.{row_number}
import org.apache.spark.sql.expressions.Window
....

val df = spark.read.json(....)
val spec = Window.partitionBy($"name").orderBy(lit(1))         <-- don't know what to used for order by

val dfWithSeq = df.withColumn("seq", row_number.over(spec))   <--- please show me the magic

Интересно, результат вернулся из dfWithSeq, показывает, что действиям каждого человека была присвоена случайная последовательность, поэтому с помощью seq действие больше не соответствует порядку, указанному в исходной таблице данных. Однако я не смог найти решения.

           actual result
--------------------------------------
 name | item id |   action  |  seq   
--------------------------------------
 John |    120  |   sell    |  1
--------------------------------------
 John |    320  |   buy     |  2
--------------------------------------
 Jane |    120  |   sell    |  2          <-- this is wrong
--------------------------------------
 Jane |    450  |   buy     |  1          <-- this is wrong
--------------------------------------
 Sam  |    360  |   sell    |  1
--------------------------------------
 Sam  |    300  |   hold    |  2
--------------------------------------
 Sam  |    450  |   buy     |  3
--------------------------------------
 Tim  |    470  |   buy     |  1
--------------------------------------

Ответы [ 2 ]

1 голос
/ 07 августа 2020

Используйте monotonically_increasing_id.

import org.apache.spark.sql.functions.{row_number, monotonically_increasing_id}
import org.apache.spark.sql.expressions.Window
....

val df = spark.read.json(....)
val spec = Window.partitionBy($"name").orderBy($"order")

val dfWithSeq = df.withColumn("order", monotonically_increasing_id)
  .withColumn("seq", row_number.over(spec))
1 голос
/ 07 августа 2020

Необходимо использовать:

  • zipWithIndex после преобразования в RDD и обратно в DF. Это узкое преобразование, которое сохранит ваш (начальный) порядок данных.
  • затем вы разбиваете должным образом с учетом этого порядкового номера в имени или чего-либо еще.

Оставьте вас чтобы проработать остальное.

...