Как сохранить порядок сортировки в PySpark collect_list и собрать несколько списков - PullRequest
0 голосов
/ 08 ноября 2018

Я хочу сохранить порядок сортировки даты, используя collect_list для нескольких столбцов, все с одинаковым порядком дат.Они понадобятся мне в одном и том же кадре данных, чтобы их можно было использовать для создания входных данных модели временных рядов.Ниже приведен пример "train_data":

enter image description here

Я использую Окно с PartitionBy для обеспечения порядка сортировки с помощью tuning_evnt_start_dt для каждого Syscode_Stn.Я могу создать один столбец с этим кодом:

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data
.withColumn('spp_imp_daily', F.collect_list('spp_imp_daily').over(w)
           )\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'))

но как мне создать два столбца в одном и том же новом фрейме данных?

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data
.withColumn('spp_imp_daily',F.collect_list('spp_imp_daily').over(w))
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))
.groupBy('Syscode_Stn')
.agg(F.max('spp_imp_daily').alias('spp_imp_daily')))

enter image description here

Обратите внимание, что MarchMadInd не показан на скриншоте, но включен в train_data.Объяснение того, как я попал туда, где я нахожусь: https://stackoverflow.com/a/49255498/8691976

1 Ответ

0 голосов
/ 08 ноября 2018

Да, правильный способ - добавить последовательные операторы .withColumn, за которыми следует оператор .agg, который удаляет дубликаты для каждого массива.

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data.withColumn('spp_imp_daily', 
F.collect_list('spp_imp_daily').over(w)
                                  )\
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))\

.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'), 
 F.max('MarchMadInd').alias('MarchMadInd')
)
...