Вы можете сделать это с помощью пары операторов groupBy,
Для начала у вас есть такой фрейм данных,
+-------------------+---------+---------------+
| date| flat| list|
+-------------------+---------+---------------+
|2014-01-01 00:00:00| A;A;B| [A, A, B]|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]|
|2014-01-03 00:00:00| H;X| [H, X]|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]|
|2014-01-05 00:00:00| S;T;U| [S, T, U]|
|2014-01-06 00:00:00| G;C;G| [G, C, G]|
+-------------------+---------+---------------+
Разнесите столбцы list
, используя F.explode
, как this,
new_frame_exp = new_frame.withColumn("exp", F.explode('list'))
Тогда ваш фрейм данных будет выглядеть следующим образом,
+-------------------+---------+---------------+---+
| date| flat| list|exp|
+-------------------+---------+---------------+---+
|2014-01-01 00:00:00| A;A;B| [A, A, B]| A|
|2014-01-01 00:00:00| A;A;B| [A, A, B]| A|
|2014-01-01 00:00:00| A;A;B| [A, A, B]| B|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| D|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| P|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| E|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| P|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| P|
|2014-01-03 00:00:00| H;X| [H, X]| H|
|2014-01-03 00:00:00| H;X| [H, X]| X|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| P|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| Q|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| G|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| S|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| T|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| U|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| G|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| C|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| G|
+-------------------+---------+---------------+---+
На этом фрейме данных создайте группу подобным образом,
new_frame_exp_agg = new_frame_exp.groupBy('date', 'flat', 'list', 'exp').count()
Затем вы будет иметь такой фрейм данных,
+-------------------+---------+---------------+---+-----+
| date| flat| list|exp|count|
+-------------------+---------+---------------+---+-----+
|2014-01-03 00:00:00| H;X| [H, X]| H| 1|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| G| 1|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| U| 1|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| T| 1|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| P| 1|
|2014-01-03 00:00:00| H;X| [H, X]| X| 1|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| G| 2|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| E| 1|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| C| 1|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| S| 1|
|2014-01-01 00:00:00| A;A;B| [A, A, B]| B| 1|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| D| 1|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| Q| 1|
|2014-01-01 00:00:00| A;A;B| [A, A, B]| A| 2|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| P| 3|
+-------------------+---------+---------------+---+-----+
. На этом фрейме данных примените еще один уровень агрегации, чтобы собрать счетчики для перечисления и найти максимальное значение, как это,
res = new_frame_exp_agg.groupBy('date', 'flat', 'list').agg(
F.collect_list('count').alias('occurances'),
F.max('count').alias('max'))
res.orderBy('date').show()
+-------------------+---------+---------------+----------+---+
| date| flat| list|occurances|max|
+-------------------+---------+---------------+----------+---+
|2014-01-01 00:00:00| A;A;B| [A, A, B]| [2, 1]| 2|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| [1, 1, 3]| 3|
|2014-01-03 00:00:00| H;X| [H, X]| [1, 1]| 1|
|2014-01-04 00:00:00| P;Q;G| [P, Q, G]| [1, 1, 1]| 1|
|2014-01-05 00:00:00| S;T;U| [S, T, U]| [1, 1, 1]| 1|
|2014-01-06 00:00:00| G;C;G| [G, C, G]| [1, 2]| 2|
+-------------------+---------+---------------+----------+---+
Если вы хотите столбец occurance
отсортирован, вы можете использовать F.array_sort
над столбцом, если вы используете версию 2.4+, иначе вам нужно написать для этого udf.