pyspark: подсчитать количество вхождений отдельных элементов в списки - PullRequest
3 голосов
/ 12 апреля 2020

Мне нужны следующие данные:

data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
     'flat': ['A;A;B', 'D;P;E;P;P', 'H;X', 'P;Q;G', 'S;T;U', 'G;C;G']}

data['date'] = pd.to_datetime(data['date'])

data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "500g") \
    .appName('my-pandasToSparkDF-app') \
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.sparkContext.setLogLevel("OFF")

df=spark.createDataFrame(data)
new_frame = df.withColumn("list", F.split("flat", "\;"))

enter image description here

Я хотел бы добавить новый столбец, который содержит количество вхождений каждого отдельного элемент (отсортированный в порядке возрастания) и другой столбец, который содержит максимум:

+-------------------+-----------+---------------------+-----------+----+
|               date| flat      | list                |occurrences|max |
+-------------------+-----------+---------------------+-----------+----+
|2014-01-01 00:00:00|A;A;B      |['A','A','B']        |[1,2]      |2   |
|2014-01-02 00:00:00|D;P;E;P;P  |['D','P','E','P','P']|[1,1,3]    |3   |
|2014-01-03 00:00:00|H;X        |['H','X']            |[1,1]      |1   |
|2014-01-04 00:00:00|P;Q;G      |['P','Q','G']        |[1,1,1]    |1   |
|2014-01-05 00:00:00|S;T;U      |['S','T','U']        |[1,1,1]    |1   |
|2014-01-06 00:00:00|G;C;G      |['G','C','G']        |[1,2]      |2   |  
+-------------------+-----------+---------------------+-----------+----+

Большое спасибо!

Ответы [ 2 ]

0 голосов
/ 12 апреля 2020

Для Spark2.4+ это может быть достигнуто без нескольких groupBys и агрегаций (, поскольку они являются дорогостоящими операциями тасования больших данных ). Вы можете сделать это, используя one expression из функций высшего порядка transform и aggregate. Это должно быть каноническое решение для spark2.4.

from pyspark.sql import functions as F
df=spark.createDataFrame(data)
df.withColumn("list", F.split("flat","\;"))\
  .withColumn("occurances", F.expr("""array_sort(transform(array_distinct(list), x-> aggregate(list, 0,(acc,t)->acc+IF(t=x,1,0))))"""))\
  .withColumn("max", F.array_max("occurances"))\
  .show()
+-------------------+---------+---------------+----------+---+
|               date|     flat|           list|occurances|max|
+-------------------+---------+---------------+----------+---+
|2014-01-01 00:00:00|    A;A;B|      [A, A, B]|    [1, 2]|  2|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| [1, 1, 3]|  3|
|2014-01-03 00:00:00|      H;X|         [H, X]|    [1, 1]|  1|
|2014-01-04 00:00:00|    P;Q;G|      [P, Q, G]| [1, 1, 1]|  1|
|2014-01-05 00:00:00|    S;T;U|      [S, T, U]| [1, 1, 1]|  1|
|2014-01-06 00:00:00|    G;C;G|      [G, C, G]|    [1, 2]|  2|
+-------------------+---------+---------------+----------+---+
0 голосов
/ 12 апреля 2020

Вы можете сделать это с помощью пары операторов groupBy,

Для начала у вас есть такой фрейм данных,

+-------------------+---------+---------------+
|               date|     flat|           list|
+-------------------+---------+---------------+
|2014-01-01 00:00:00|    A;A;B|      [A, A, B]|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]|
|2014-01-03 00:00:00|      H;X|         [H, X]|
|2014-01-04 00:00:00|    P;Q;G|      [P, Q, G]|
|2014-01-05 00:00:00|    S;T;U|      [S, T, U]|
|2014-01-06 00:00:00|    G;C;G|      [G, C, G]|
+-------------------+---------+---------------+

Разнесите столбцы list, используя F.explode, как this,

new_frame_exp = new_frame.withColumn("exp", F.explode('list'))

Тогда ваш фрейм данных будет выглядеть следующим образом,

+-------------------+---------+---------------+---+
|               date|     flat|           list|exp|
+-------------------+---------+---------------+---+
|2014-01-01 00:00:00|    A;A;B|      [A, A, B]|  A|
|2014-01-01 00:00:00|    A;A;B|      [A, A, B]|  A|
|2014-01-01 00:00:00|    A;A;B|      [A, A, B]|  B|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]|  D|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]|  P|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]|  E|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]|  P|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]|  P|
|2014-01-03 00:00:00|      H;X|         [H, X]|  H|
|2014-01-03 00:00:00|      H;X|         [H, X]|  X|
|2014-01-04 00:00:00|    P;Q;G|      [P, Q, G]|  P|
|2014-01-04 00:00:00|    P;Q;G|      [P, Q, G]|  Q|
|2014-01-04 00:00:00|    P;Q;G|      [P, Q, G]|  G|
|2014-01-05 00:00:00|    S;T;U|      [S, T, U]|  S|
|2014-01-05 00:00:00|    S;T;U|      [S, T, U]|  T|
|2014-01-05 00:00:00|    S;T;U|      [S, T, U]|  U|
|2014-01-06 00:00:00|    G;C;G|      [G, C, G]|  G|
|2014-01-06 00:00:00|    G;C;G|      [G, C, G]|  C|
|2014-01-06 00:00:00|    G;C;G|      [G, C, G]|  G|
+-------------------+---------+---------------+---+

На этом фрейме данных создайте группу подобным образом,

new_frame_exp_agg = new_frame_exp.groupBy('date', 'flat', 'list', 'exp').count()

Затем вы будет иметь такой фрейм данных,

+-------------------+---------+---------------+---+-----+
|               date|     flat|           list|exp|count|
+-------------------+---------+---------------+---+-----+
|2014-01-03 00:00:00|      H;X|         [H, X]|  H|    1|
|2014-01-04 00:00:00|    P;Q;G|      [P, Q, G]|  G|    1|
|2014-01-05 00:00:00|    S;T;U|      [S, T, U]|  U|    1|
|2014-01-05 00:00:00|    S;T;U|      [S, T, U]|  T|    1|
|2014-01-04 00:00:00|    P;Q;G|      [P, Q, G]|  P|    1|
|2014-01-03 00:00:00|      H;X|         [H, X]|  X|    1|
|2014-01-06 00:00:00|    G;C;G|      [G, C, G]|  G|    2|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]|  E|    1|
|2014-01-06 00:00:00|    G;C;G|      [G, C, G]|  C|    1|
|2014-01-05 00:00:00|    S;T;U|      [S, T, U]|  S|    1|
|2014-01-01 00:00:00|    A;A;B|      [A, A, B]|  B|    1|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]|  D|    1|
|2014-01-04 00:00:00|    P;Q;G|      [P, Q, G]|  Q|    1|
|2014-01-01 00:00:00|    A;A;B|      [A, A, B]|  A|    2|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]|  P|    3|
+-------------------+---------+---------------+---+-----+

. На этом фрейме данных примените еще один уровень агрегации, чтобы собрать счетчики для перечисления и найти максимальное значение, как это,

res = new_frame_exp_agg.groupBy('date', 'flat', 'list').agg(
                                         F.collect_list('count').alias('occurances'),
                                         F.max('count').alias('max'))

res.orderBy('date').show()


+-------------------+---------+---------------+----------+---+
|               date|     flat|           list|occurances|max|
+-------------------+---------+---------------+----------+---+
|2014-01-01 00:00:00|    A;A;B|      [A, A, B]|    [2, 1]|  2|
|2014-01-02 00:00:00|D;P;E;P;P|[D, P, E, P, P]| [1, 1, 3]|  3|
|2014-01-03 00:00:00|      H;X|         [H, X]|    [1, 1]|  1|
|2014-01-04 00:00:00|    P;Q;G|      [P, Q, G]| [1, 1, 1]|  1|
|2014-01-05 00:00:00|    S;T;U|      [S, T, U]| [1, 1, 1]|  1|
|2014-01-06 00:00:00|    G;C;G|      [G, C, G]|    [1, 2]|  2|
+-------------------+---------+---------------+----------+---+

Если вы хотите столбец occurance отсортирован, вы можете использовать F.array_sort над столбцом, если вы используете версию 2.4+, иначе вам нужно написать для этого udf.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...