PySpark: как правильно агрегировать json столбцов - PullRequest
1 голос
/ 16 марта 2020

Я упаковал свои вложенные json как строковые столбцы в моем фрейме данных pyspark, и я пытаюсь выполнить UPSERT для некоторых столбцов на основе groupBy.

Ввод:

from pyspark.sql.functions import *
from pyspark.sql.types import *

input_json = """[{
    "candidate_email": "cust1@email.com",
    "transactions":"[{'transaction_id':'10', 'transaction_amount':'$55.46'},{'transaction_id':'11', 'transaction_amount':'$545.46'}]"
},
{
    "candidate_email": "cust1@email.com",
    "transactions":"[{'transaction_id':'12', 'transaction_amount':'$23.43'}]"
}
]
"""
input_df = spark.read.json(sc.parallelize([input_json]), multiLine=True)
input_df.printSchema()
# root
#  |-- candidate_email: string (nullable = true)
#  |-- transactions: string (nullable = true)

Преобразование и токовый выход:

output_df = input_df.groupBy("candidate_email").agg(collect_list(col("transactions")).alias("transactions"))
output_df.printSchema()
output_df.collect()

# root
#  |-- candidate_email: string (nullable = true)
#  |-- transactions: array (nullable = true)
#  |    |-- element: string (containsNull = true)

# Out[161]:
# [Row(candidatey_email='cust1@email.com', transactions=["[{'transaction_id':'10', 'transaction_amount':'$55.46'},{'transaction_id':'11', 'transaction_amount':'$545.46'}]", "[{'transaction_id':'12', 'transaction_amount':'$23.43'}]"])]

Но какие изменения я должен внести в приведенный выше код, чтобы получить этот вывод:

желаемый вывод:

output_json = """[{
    "candidate_email": "cust1@email.com",
    "transactions":"[{'transaction_id':'10', 'transaction_amount':'$55.46'},{'transaction_id':'11', 'transaction_amount':'$545.46'}, {'transaction_id':'12', 'transaction_amount':'$23.43'}]"
}]"""
output_df = spark.read.json(sc.parallelize([output_json]), multiLine=True)
output_df.printSchema()
# root
#  |-- candidate_email: string (nullable = true)
#  |-- transactions: string (nullable = true)

По сути, я пытаюсь получить чистое слияние, используя один список вместо нескольких.

Спасибо!

1 Ответ

0 голосов
/ 16 марта 2020

Поскольку у вас есть тип string для столбца транзакций, мы должны преобразовать его в тип array, затем explode и groupBy + collect_list мы можем достичь ожидаемого результата.

Example:

df.show(10,False)
#+---------------+----------------------------------------------------------------------------------------------------------------+
#|candidate_email|transactions                                                                                                    |
#+---------------+----------------------------------------------------------------------------------------------------------------+
#|cust1@email.com|[{'transaction_id':'10', 'transaction_amount':'$55.46'},{'transaction_id':'11', 'transaction_amount':'$545.46'}]|
#|cust1@email.com|[{'transaction_id':'12', 'transaction_amount':'$23.43'}]                                                        |
#+---------------+----------------------------------------------------------------------------------------------------------------+

#to make proper array we first replace (},) with (}},) then remove ("[|]") and split on (},) it results array finally we explode on the array. 
df1=df.selectExpr("candidate_email","""explode(split(regexp_replace(regexp_replace(transactions,'(\\\},)','}},'),'(\\\[|\\\])',''),"},")) as transactions""")

df1.show(10,False)
#+---------------+------------------------------------------------------+
#|candidate_email|transactions                                          |
#+---------------+------------------------------------------------------+
#|cust1@email.com|{'transaction_id':'10', 'transaction_amount':'$55.46'}|
#|cust1@email.com|{'transaction_id':'11','transaction_amount':'$545.46'}|
#|cust1@email.com|{'transaction_id':'12', 'transaction_amount':'$23.43'}|
#+---------------+------------------------------------------------------+

#groupBy and then create json object
df2=df1.groupBy("candidate_email").\
agg(collect_list(col("transactions")).alias("transactions"))

df2.show(10,False)


#+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|candidate_email|transactions                                                                                                                                                            |
#+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|cust1@email.com|[{'transaction_id':'10', 'transaction_amount':'$55.46'}, {'transaction_id':'11','transaction_amount':'$545.46'}, {'transaction_id':'12', 'transaction_amount':'$23.43'}]|
#+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

#creating json object column in dataframe
df2.selectExpr("to_json(struct(candidate_email,transactions)) as json").show(10,False)
#+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|json                                                                                                                                                                                                                             |
#+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|{"candidate_email":"cust1@email.com","transactions":["{'transaction_id':'10', 'transaction_amount':'$55.46'}","{'transaction_id':'11','transaction_amount':'$545.46'}","{'transaction_id':'12', 'transaction_amount':'$23.43'}"]}|
#+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

#To the output to json file

df2.groupBy("candidate_email").agg(collect_list(col("transactions")).alias("transactions")).write.mode("overwrite").json("<path>")

#content of file
#{"candidate_email":"cust1@email.com","transactions":["{'transaction_id':'10', 'transaction_amount':'$55.46'}","{'transaction_id':'11','transaction_amount':'$545.46'}","{'transaction_id':'12', 'transaction_amount':'$23.43'}"]}

#converting to json by using toJSON
df2.groupBy("candidate_email").agg(collect_list(col("transactions")).alias("transactions")).toJSON().collect()
#[u'{"candidate_email":"cust1@email.com","transactions":["{\'transaction_id\':\'10\', \'transaction_amount\':\'$55.46\'}","{\'transaction_id\':\'11\',\'transaction_amount\':\'$545.46\'}","{\'transaction_id\':\'12\', \'transaction_amount\':\'$23.43\'}"]}']
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...