PySpark: collect_set для столбца данных на основе порядка другого столбца - PullRequest
0 голосов
/ 21 октября 2019

У меня есть кадр данных Spark, который выглядит примерно так:

id  country  date        action
 1    A   2019-01-01   suppress
 1    A   2019-01-02   suppress
 2    A   2019-01-03   bid-up
 2    A   2019-01-04   bid-down
 3    C   2019-01-01   no-action
 3    C   2019-01-02   bid-up
 4    D   2019-01-01   suppress

Я хочу уменьшить этот кадр данных, сгруппировав его по id, стране и собрав уникальные значения столбца 'action' в массив, но этот массив должен быть упорядочен по столбцу даты.

Например,

id  country action_arr
 1    A      [suppress]
 2    A      [bid-up, bid-down]
 3    C      [no-action, bid-up]
 4    D      [suppress]

Чтобы объяснить это немного более кратко, у меня есть некоторый код SQL (presto), который делает именно то, что я хочу. .. я просто пытаюсь сделать это в PySpark или SparkSQL:

SELECT id, country, array_distinct(array_agg(action ORDER BY date ASC)) AS actions
FROM table
GROUP BY id, country

Теперь вот моя попытка в PySpark:

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('action').orderBy('date')

sorted_list_df = df.withColumn('sorted_list', F.collect_set('action').over(w))

Тогда я хочу узнать количество случаевкаждого набора действий по группам:

df = sorted_list_df.select('country', 'sorted_list').groupBy('coutry', 'sorted_list').agg(F.count('sorted_list'))

Код выполняется, но в выходном столбце sorted_list он в основном совпадает с действием без агрегации массива. Может ли кто-нибудь помочь?

ПРАВИТЬМне удалось получить то, что я хочу ... но результаты не полностью соответствуют результатам до. Кто-нибудь может объяснить, почему? Решение ниже:

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('action').orderBy('date')

df_2 = df.withColumn("sorted_list", F.collect_set("action").over(Window.partitionBy("id").orderBy("date")))

test = df_2.select('id', 'country', 'sorted_list')\
           .dropDuplicates()\
           .select('country', 'sorted_list')\
           .groupBy('site_name', 'sorted_list')\
           .agg(F.count('sorted_list'))

1 Ответ

0 голосов
/ 21 октября 2019

IMO, ваше определение окна неверно. Вы должны разделить по столбцу, с которым вы хотите сделать группы, а затем собрать набор уникальных значений для каждой группы.

IIUC, вам просто нужно сделать:

w = Window.partitionBy(['id', 'country']).orderBy('date')

sorted_list_df = df.withColumn('sorted_list', F.collect_set('action').over(w))

df_new = sorted_list_df.select('id', 'country', 'sorted_list').withColumn("count_of_elems", F.size("sorted_list"))

DRAWBACK :

Если вы используете окно, у вас будет новый набор длякаждый ряд, и количество ваших строк будет таким же, как старый DF. Агрегации как таковой не будет, так как я не думаю, что это тоже не то, что вам нужно.

В следующей строке агрегируются значения в виде набора для каждой группы. Я надеюсь, что это даст вам именно то, что вы хотите:

df_new = sorted_list_df.groupby('id', 'country').agg(F.max('sorted_list').alias('sorted_list')).withColumn("count_of_elems", F.size("sorted_list"))
...