Использование Collect_set после взрыва в объекте groupedBy в Pyspark - PullRequest
0 голосов
/ 11 сентября 2018

У меня есть фрейм данных с такой схемой:

root
 |-- docId: string (nullable = true)
 |-- field_a: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- field_b: array (nullable = true)
 |    |-- element: string (containsNull = true)

Я хочу выполнить groupBy на field_a и использовать collect_set, чтобы сохранить все различные значения (в основном, внутренниезначения в списке) в field_b в агрегации, я не хочу добавлять новый столбец путем взрыва field_b и затем делать collect_set в агрегации.

Как я могу использовать udaf или pandas udf для достижения этой цели?

Например:

+---------------------+----------------+------------+
|docId                |field_b         |field_a     |
+---------------------+----------------+------------+
|k&;+B8ROh\\NmetBg=DiR|[IDN,SGP]       |[F]         |
|k&;.]^nX7HRdjIO`>S1+ |[IND,KWT]       |[M]         |
|k&;h&)8Sd\\JrDVL%VH>N|[IDN,IND]       |[M]         |
|k&<8nTqjrYNE8taji^$u |[IND,BHR]       |[F]         |
|k&=$M5Hmd6Y>&@'co-^1 |[IND,AUS]       |[M]         |
|k&>pIZ)g^!L/ht!T\'/"f|[IDN,KWT]       |[M]         |
|k&@ZX>Ph%rPdZ[,Pqsc. |[IND,MYS]       |[F]         |
|k&A]C>dmDXVN$hiVEUk/ |[IND,PHL]       |[F]         |
|k&BX1eGhumSQ6`7A8<Zd |[IND,SAU]       |[M]         |
|k&J)2Vo(k*[^c"Mg*f%) |[IND,SGP]       |[F]         |
+---------------------+----------------+------------+

Вывод, который я ищу:

+------------+--------------------------------+
|field_a     |collect_set(field__b)           |
+------------+--------------------------------+
|[F]         |[IDN,IND,SGP,BHR,MYS,PHL]       |
|[M]         |[IND,KWT,IDN,AUS,SAU,KWT]       |
+------------+--------------------------------+

1 Ответ

0 голосов
/ 19 сентября 2018

Я написал решение вашей проблемы, используя UDF для панд.Я не понял, почему ваш столбец field_a (представляющий пол?) Был списком, поэтому я превратил его в простую строку, но вы можете сделать это списком строк, если хотите.Вот оно:

(1) Создайте фиктивную df в пандах и создайте искру DataFrame:

import pandas as pd
import random
from pyspark.sql.functions import pandas_udf, PandasUDFType

a_list = ['F', 'M']
b_list = ['IDN', 'IND', 'SGP', 'BHR', 'MYS', 'PHL', 'AUS', 'SAU', 'KWT']
size = 10
dummy_df = pd.DataFrame({'docId': [random.randint(0,100) for _ in range(size)],
                         'field_b': [[random.choice(b_list), random.choice(b_list)] for _ in range(size)],
                         'field_a': [random.choice(a_list) for _ in range(size)]})

df = spark.createDataFrame(dummy_df)

производя:

+-----+-------+----------+
|docId|field_a|   field_b|
+-----+-------+----------+
|   23|      F|[SAU, SGP]|
|   36|      F|[IDN, PHL]|
|   82|      M|[BHR, SAU]|
|   30|      F|[AUS, IDN]|
|   75|      F|[AUS, MYS]|
|   46|      F|[SAU, IDN]|
|   11|      F|[SAU, BHR]|
|   71|      M|[KWT, IDN]|
|   50|      F|[IND, SGP]|
|   78|      F|[IND, SGP]|
+-----+-------+----------+

(2) Затем определите пандUDF, сгруппировать и применить:

@pandas_udf('field_a string, set_field_b array<string>', PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
    unique_values = pd.DataFrame(df['field_b'].values.tolist()).stack().unique().tolist()
    return pd.DataFrame({'field_a': df['field_a'].iloc[0], 'set_field_b': [unique_values]})

result = df.groupby('field_a').apply(my_pandas_udf)

, чтобы получить конечный результат:

+-------+--------------------+
|field_a|         set_field_b|
+-------+--------------------+
|      F|[SAU, SGP, IDN, P...|
|      M|[BHR, SAU, KWT, IDN]|
+-------+--------------------+

Мне не очень нравится подход панды / толист / стек / уникальный подход, возможно, есть лучшийЭто можно сделать, но обработка списков внутри фреймов данных Pandas, как правило, не проста.

Теперь вам нужно сравнить производительность с подходом explode + groupby + collect_set, не зная, какой из них будет быстрее.Расскажите нам, когда узнаете!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...