Как отсортировать переменную в каждой группе в pyspark? - PullRequest
0 голосов
/ 14 мая 2018

Я пытаюсь отсортировать значение val, используя другой столбец ts для каждого id.

# imports
from pyspark.sql import functions as F
from pyspark.sql import SparkSession as ss
import pandas as pd

# create dummy data
pdf = pd.DataFrame( [['2',2,'cat'],['1',1,'dog'],['1',2,'cat'],['2',3,'cat'],['2',4,'dog']] ,columns=['id','ts','val'])
sdf = ss.createDataFrame( pdf )
sdf.show()

+---+---+---+
| id| ts|val|
+---+---+---+
|  2|  2|cat|
|  1|  1|dog|
|  1|  2|cat|
|  2|  3|cat|
|  2|  4|dog|
+---+---+---+

1 Ответ

0 голосов
/ 14 мая 2018

Вы можете агрегировать по id и сортировать по ts:

sorted_sdf = ( sdf.groupBy('id')
                  .agg( F.sort_array( F.collect_list( F.struct( F.col('ts'), F.col('val') ) ), asc = True)
                  .alias('sorted_col') )  
             )

sorted_sdf.show()

+---+--------------------+
| id|          sorted_col|
+---+--------------------+
|  1|  [[1,dog], [2,cat]]|
|  2|[[2,cat], [3,cat]...|
+---+--------------------+

Затем мы можем взорвать этот список:

explode_sdf = sorted_sdf.select( 'id' , F.explode( F.col('sorted_col') ).alias('sorted_explode') )

explode_sdf.show()

+---+--------------+
| id|sorted_explode|
+---+--------------+
|  1|       [1,dog]|
|  1|       [2,cat]|
|  2|       [2,cat]|
|  2|       [3,cat]|
|  2|       [4,dog]|
+---+--------------+

Разбить кортежи sorted_explodeна две части:

detupled_sdf = explode_sdf.select( 'id', 'sorted_explode.*' )

detupled_sdf.show()

+---+---+---+
| id| ts|val|
+---+---+---+
|  1|  1|dog|
|  1|  2|cat|
|  2|  2|cat|
|  2|  3|cat|
|  2|  4|dog|
+---+---+---+

Теперь наш исходный кадр данных отсортирован по ts для каждого id!

...