Поскольку у вас есть тип string
для столбца транзакций, мы должны преобразовать его в тип array
, затем explode
и groupBy + collect_list
мы можем достичь ожидаемого результата.
Example:
df.show(10,False)
#+---------------+----------------------------------------------------------------------------------------------------------------+
#|candidate_email|transactions |
#+---------------+----------------------------------------------------------------------------------------------------------------+
#|cust1@email.com|[{'transaction_id':'10', 'transaction_amount':'$55.46'},{'transaction_id':'11', 'transaction_amount':'$545.46'}]|
#|cust1@email.com|[{'transaction_id':'12', 'transaction_amount':'$23.43'}] |
#+---------------+----------------------------------------------------------------------------------------------------------------+
#to make proper array we first replace (},) with (}},) then remove ("[|]") and split on (},) it results array finally we explode on the array.
df1=df.selectExpr("candidate_email","""explode(split(regexp_replace(regexp_replace(transactions,'(\\\},)','}},'),'(\\\[|\\\])',''),"},")) as transactions""")
df1.show(10,False)
#+---------------+------------------------------------------------------+
#|candidate_email|transactions |
#+---------------+------------------------------------------------------+
#|cust1@email.com|{'transaction_id':'10', 'transaction_amount':'$55.46'}|
#|cust1@email.com|{'transaction_id':'11','transaction_amount':'$545.46'}|
#|cust1@email.com|{'transaction_id':'12', 'transaction_amount':'$23.43'}|
#+---------------+------------------------------------------------------+
#groupBy and then create json object
df2=df1.groupBy("candidate_email").\
agg(collect_list(col("transactions")).alias("transactions"))
df2.show(10,False)
#+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|candidate_email|transactions |
#+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|cust1@email.com|[{'transaction_id':'10', 'transaction_amount':'$55.46'}, {'transaction_id':'11','transaction_amount':'$545.46'}, {'transaction_id':'12', 'transaction_amount':'$23.43'}]|
#+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#creating json object column in dataframe
df2.selectExpr("to_json(struct(candidate_email,transactions)) as json").show(10,False)
#+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|json |
#+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|{"candidate_email":"cust1@email.com","transactions":["{'transaction_id':'10', 'transaction_amount':'$55.46'}","{'transaction_id':'11','transaction_amount':'$545.46'}","{'transaction_id':'12', 'transaction_amount':'$23.43'}"]}|
#+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#To the output to json file
df2.groupBy("candidate_email").agg(collect_list(col("transactions")).alias("transactions")).write.mode("overwrite").json("<path>")
#content of file
#{"candidate_email":"cust1@email.com","transactions":["{'transaction_id':'10', 'transaction_amount':'$55.46'}","{'transaction_id':'11','transaction_amount':'$545.46'}","{'transaction_id':'12', 'transaction_amount':'$23.43'}"]}
#converting to json by using toJSON
df2.groupBy("candidate_email").agg(collect_list(col("transactions")).alias("transactions")).toJSON().collect()
#[u'{"candidate_email":"cust1@email.com","transactions":["{\'transaction_id\':\'10\', \'transaction_amount\':\'$55.46\'}","{\'transaction_id\':\'11\',\'transaction_amount\':\'$545.46\'}","{\'transaction_id\':\'12\', \'transaction_amount\':\'$23.43\'}"]}']