Pyspark взорвать список диктов и сгруппировать их по ключу dict - PullRequest
0 голосов
/ 29 января 2020

У меня есть фрейм данных, который состоит из списка словарей, я хочу разделить каждый словарь и создать строку на основе одного из значений ключа.

пример данных:

[{"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"},{"col.1":"13ABC","col.2":"141","col.3":"","col.4":"ABCD"}]

входной DataFrame:

+---------------------------------------------------------------------------------------------------------------------------+
|ID|DATASET                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------+
|4A|[{"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"},{"col.1":"13ABC","col.2":"141","col.3":"","col.4":"ABCD"}]   |
|4B|[]                                                                                                                      |
+---------------------------------------------------------------------------------------------------------------------------+

Ожидаемый результат:

+-----------------------------------------------------------------+
|ID|col_1 | col_2 | col                                           |
+-----------------------------------------------------------------+
|4A|"12ABC"|"{"col.2":"141","col.3":"","col.4":"ABCD"}"           |
|4A|"13ABC"|"{"col.2":"141","col.3":"","col.4":"ABCD"}"           |
|4B|""|""                                                         |
+-----------------------------------------------------------------+

Попытка создать схему для набора данных col и разделяет данные, но не обязательно группирует их и объединяет их на основе значения столбца

schema = spark.read. json (df.rdd.map (lambda row: row.dataset)). schema

, а также по ссылке Преобразование кадра данных Pyspark в словарь

Заранее спасибо

EDITED

df2.withColumn("col", f.to_json(f.struct("`col.1`","`col.2`"))).show(truncate=False)

РЕЗУЛЬТАТ:

+---+-----+-----+-----+-----+-------------------------------+
|ID |col.1|col.2|col.3|col.4|col                            |
+---+-----+-----+-----+-----+-------------------------------+
|4A |12ABC|141  |     |ABCD |{"col.1":"12ABC","col.2":"141"}|
|4A |13ABC|141  |     |ABCD |{"col.1":"13ABC","col.2":"141"}|
+---+-----+-----+-----+-----+-------------------------------+

Ответы [ 2 ]

0 голосов
/ 07 февраля 2020

Вместо регулярных выражений мы можем использовать параметры posexplode и concat_ws.

df2.select(
        "*",
        F.posexplode(F.split("DATASET", ",")).alias("pos", "token")
    )\
    .where("pos > 0")\
    .groupBy("ID", "DATASET")\
    .agg(F.concat_ws("_" ,F.collect_list("token")).alias("data_cols"))\
    .select(
        "ID",
        F.split("DATASET", ",").getItem(0).alias("col_id"),
        "data"
    )\
    .show(truncate=False)
0 голосов
/ 03 февраля 2020

Я попробовал с подходом ниже, и это работает. Не уверен, что это оптимизированный подход, открытый для входов и улучшений.

input df

>>> df.show(truncate=False)
+--------------------------------------------------------------------------------------------------------------------+---+-----+-----+
|DATASET                                                                                                             |ID |count|level|
+--------------------------------------------------------------------------------------------------------------------+---+-----+-----+
|[{"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"},{"col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD"}]|4A |156  |36   |
|[]                                                                                                                  |4B |179  |258  |
|[{"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"}]                                                         |4C |222  |158  |
|[{"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"},{"col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"}]  |4D |222  |158  |
+--------------------------------------------------------------------------------------------------------------------+---+-----+-----+


# Splitting list of dictionaries into rows
>>> df2 = df.withColumn("data",explode(split(regexp_replace(col("DATASET"), "(^\[)|(\]$)", ""), ", "))).withColumn("data",explode(split('data','},'))).withColumn("data",explode(split(regexp_replace(col("data"), "(^\{)|(\}$)", ""), ", ")))
>>> df2 = df2.drop(df2.DATASET)
>>> df2.show(truncate=False)
+---+-----+-----+-------------------------------------------------------+
|ID |count|level|data                                                   |
+---+-----+-----+-------------------------------------------------------+
|4A |156  |36   |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|
|4A |156  |36   |"col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD" |
|4B |179  |258  |                                                       |
|4C |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|
|4D |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|
|4D |222  |158  |"col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"   |
+---+-----+-----+-------------------------------------------------------+


#Getting col.1 value from the seperated dict
>>> col_1 = F.split(df2['data'],',')
>>> df3 = df2.withColumn('col_1', col_1.getItem(0))
>>> df3.show(truncate=False)
+---+-----+-----+-------------------------------------------------------+---------------+
|ID |count|level|data                                                   |col_1          |
+---+-----+-----+-------------------------------------------------------+---------------+
|4A |156  |36   |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"col.1":"12ABC"|
|4A |156  |36   |"col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD" |"col.1":"13ABC"|
|4B |179  |258  |                                                       |               |
|4C |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"col.1":"12ABC"|
|4D |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"col.1":"12ABC"|
|4D |222  |158  |"col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"   |"col.1":"12ABC"|
+---+-----+-----+-------------------------------------------------------+---------------+


#Getting value of col.1
>>> col_1 = F.split(df3['col_1'],':')
>>> df3 =df3.withColumn('col_1',col_1.getItem(1)).drop(df3.col_1)
>>> df3.show(truncate=False)
+---+-----+-----+-------------------------------------------------------+-------+
|ID |count|level|data                                                   |col_1  |
+---+-----+-----+-------------------------------------------------------+-------+
|4A |156  |36   |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"12ABC"|
|4A |156  |36   |"col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD" |"13ABC"|
|4B |179  |258  |                                                       |null   |
|4C |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"12ABC"|
|4D |222  |158  |"col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"|"12ABC"|
|4D |222  |158  |"col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"   |"12ABC"|
+---+-----+-----+-------------------------------------------------------+-------+


#Grouping data by ID and col_1 columns
>>> grp_df = df3.groupBy(grouping_cols).agg(collect_list("data").name("dataset"))
>>> grp_df.show(truncate=False)
+-------+---+---------------------------------------------------------------------------------------------------------------+
|col_1  |ID |dataset                                                                                                        |
+-------+---+---------------------------------------------------------------------------------------------------------------+
|"13ABC"|4A |["col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD"]                                                       |
|null   |4B |[]                                                                                                             |
|"12ABC"|4D |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD", "col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"]|
|"12ABC"|4A |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
|"12ABC"|4C |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
+-------+---+---------------------------------------------------------------------------------------------------------------+

>>> grp_df = grp_df.selectExpr("col_1 as grp_col", "ID as ID","dataset as dataset")
>>> grp_df.show(truncate=False)
+-------+---+---------------------------------------------------------------------------------------------------------------+
|grp_col|ID |dataset                                                                                                        |
+-------+---+---------------------------------------------------------------------------------------------------------------+
|"13ABC"|4A |["col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD"]                                                       |
|null   |4B |[]                                                                                                             |
|"12ABC"|4D |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD", "col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"]|
|"12ABC"|4A |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
|"12ABC"|4C |["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
+-------+---+---------------------------------------------------------------------------------------------------------------+


# joining grouped df with main dataframe and removing dupes
>>> final_df = df3.join(grp_df,'ID').dropDuplicates().drop('data').drop('col_1')
>>> final_df.show(truncate=False)
+---+-----+-----+-------+---------------------------------------------------------------------------------------------------------------+
|ID |count|level|grp_col|dataset                                                                                                        |
+---+-----+-----+-------+---------------------------------------------------------------------------------------------------------------+
|4B |179  |258  |null   |[]                                                                                                             |
|4C |222  |158  |"12ABC"|["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
|4A |156  |36   |"13ABC"|["col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD"]                                                       |
|4A |156  |36   |"12ABC"|["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
|4A |156  |36   |"13ABC"|["col.1":"13ABC","col.2":"141","col.3":"","col.4":"FCD"]                                                       |
|4A |156  |36   |"12ABC"|["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD"]                                                      |
|4D |222  |158  |"12ABC"|["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD", "col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"]|
|4D |222  |158  |"12ABC"|["col.1":"12ABC","col.2":"141","col.3":"","col.4":"ABCD", "col.1":"12ABC","col.2":"1","col.3":"","col.4":"ECD"]|
+---+-----+-----+-------+---------------------------------------------------------------------------------------------------------------+
...