PySpark: группа чисел типа группы, основанная на последовательности наблюдений - PullRequest
2 голосов
/ 20 апреля 2019

Я пытаюсь определить порядок типов партий на основе порядка их поступления.

Я начинаю с этого кадра данных

+--------+-----+
|sequence|batch|
+————————+—————+
|       1|    a|
|       2|    a|
|       3|    a|
|       4|    a|
|       5|    b|
|       6|    b|
|       7|    b|
|       8|    a|
|       9|    a|
|      10|    a|
|      11|    c|
|      12|    c|
|      13|    c|
|      14|    c|
+———————-+---——+

Что я хочу сделать, это определить партии в порядке их поступления, как показано ниже.

+--------+-----++----------+
|sequence|batch|batch_order|
+————————+—————+————------—+
|       1|    a|          1|
|       2|    a|          1|
|       3|    a|          1|
|       4|    a|          1|
|       5|    b|          2|
|       6|    b|          2|
|       7|    b|          2|
|       8|    a|          3|
|       9|    a|          3|
|      10|    a|          3|
|      11|    c|          4|
|      12|    c|          4|
|      13|    c|          4|
|      14|    c|          4|
+———————-+---——+————-------+

Когда я группирую партии, все партии типа А группируются вместе. Мне нужны субпартии в порядке поступления.

Вот пример кода для создания тестовых данных.

from pyspark.sql import SparkSession
from pyspark.sql.types import  IntegerType
import pyspark.sql.functions as F
from pyspark.sql import Window


spark = SparkSession.builder.appName('test').master("local[*]").getOrCreate()

df = spark.createDataFrame([[1, 'a'],
 [2, 'a'],
 [3, 'a'],
 [4, 'a'],
 [5, 'b'],
 [6, 'b'],
 [7, 'b'],
 [8, 'a'],
 [9, 'a'],
 [10, 'a'],
 [11, 'c'],
 [12, 'c'],
 [13, 'c'],
 [14, 'c']], schema=['order', 'batch'])
df = df.withColumn('order', F.col("order").cast(IntegerType()))


Я пробовал это окно, но оно группируется по типу пакета, а не по последовательности пакета.

df1 = df.withColumn("row_num", F.row_number().over(Window.partitionBy("batch").orderBy("order")))

df1.show()

+-----+-----+-------+
|order|batch|row_num|
+-----+-----+-------+
|   11|    c|      1|
|   12|    c|      2|
|   13|    c|      3|
|   14|    c|      4|
|    5|    b|      1|
|    6|    b|      2|
|    7|    b|      3|
|    1|    a|      1|
|    2|    a|      2|
|    3|    a|      3|
|    4|    a|      4|
|    8|    a|      5|
|    9|    a|      6|
|   10|    a|      7|
+-----+-----+-------+

1 Ответ

0 голосов
/ 20 апреля 2019

Один из способов - использовать оконную функцию lag () для получения предыдущего значения batch, а затем сравнить его с текущим batch, использовать этот флаг для накопления суммы.

from pyspark.sql import functions as F, Window

# set up the Window Spec
# note: partitionBy(F.lit(0)) just to bypass the WARN message
win = Window.partitionBy(F.lit(0)).orderBy('sequence')

# get the prev_batch, 
# set up the flag based on: batch == prev_batch ? 0 : 1
# batch_order is the running sum with the column-flag
df.withColumn('prev_batch', F.lag('batch').over(win)) \
  .withColumn('flag', F.when(F.col('batch') == F.col('prev_batch'),0).otherwise(1)) \
  .withColumn('batch_order', F.sum('flag').over(win)) \
  .drop('prev_batch', 'flag') \
  .sort('sequence') \
  .show()
#+--------+-----+-----------+
#|sequence|batch|batch_order|
#+--------+-----+-----------+
#|       1|    a|          1|
#|       2|    a|          1|
#|       3|    a|          1|
#|       4|    a|          1|
#|       5|    b|          2|
#|       6|    b|          2|
#|       7|    b|          2|
#|       8|    a|          3|
#|       9|    a|          3|
#|      10|    a|          3|
#|      11|    c|          4|
#|      12|    c|          4|
#|      13|    c|          4|
#|      14|    c|          4|
#+--------+-----+-----------+
...