Окончательный результат по желанию OP - это list
из dictionaries
, где все строки в кадре данных, которые имеют одинаковые Venue
и Date
, были объединены.
# Creating the DataFrames
df_Incoming = sqlContext.createDataFrame([('Hotel','20190101',15,3),('Beach','20190101',93,45)],('Venue','Date','08','10'))
df_Incoming.show()
+-----+--------+---+---+
|Venue| Date| 08| 10|
+-----+--------+---+---+
|Hotel|20190101| 15| 3|
|Beach|20190101| 93| 45|
+-----+--------+---+---+
df_Outgoing = sqlContext.createDataFrame([('Beach','20190101',30,5),('Hotel','20190103',5,15)],('Venue','Date','07','10'))
df_Outgoing.show()
+-----+--------+---+---+
|Venue| Date| 07| 10|
+-----+--------+---+---+
|Beach|20190101| 30| 5|
|Hotel|20190103| 5| 15|
+-----+--------+---+---+
Идея состоит в том, чтобы создать dictionary
из каждого row
и сохранить все rows
из DataFrame
в виде словарей в одном большом list
. И в качестве последнего шага мы объединяем вместе те словари, которые имеют одинаковые Venue
и Date
.
Поскольку все rows
в DataFrame хранятся как Row()
объекты, мы используем функцию collect () , чтобы вернуть все записи как list
из Row()
. Просто чтобы проиллюстрировать вывод -
print(df_Incoming.collect())
[Row(Venue='Hotel', Date='20190101', 08=15, 10=3), Row(Venue='Beach', Date='20190101', 08=93, 10=45)]
Но, поскольку мы хотим list
из dictionaries
, мы можем использовать list comprehensions
для преобразования их в единицу -
list_Incoming = [row.asDict() for row in df_Incoming.collect()]
print(list_Incoming)
[{'10': 3, 'Date': '20190101', 'Venue': 'Hotel', '08': 15}, {'10': 45, 'Date': '20190101', 'Venue': 'Beach', '08': 93}]
Но, поскольку числовые столбцы были в форме, подобной "08":{ "IncomingCount":15 }
, вместо "08":15
, поэтому мы используем dictionary comprehensions
для преобразования их в эту форму -
list_Incoming = [ {k:v if k in ['Venue','Date'] else {'IncomingCount':v} for k,v in dict_element.items()} for dict_element in list_Incoming]
print(list_Incoming)
[{'10': {'IncomingCount': 3}, 'Date': '20190101', 'Venue': 'Hotel', '08': {'IncomingCount': 15}}, {'10': {'IncomingCount': 45}, 'Date': '20190101', 'Venue': 'Beach', '08': {'IncomingCount': 93}}]
Аналогично, мы делаем для OutgoingCount
list_Outgoing = [row.asDict() for row in df_Outgoing.collect()]
list_Outgoing = [ {k:v if k in ['Venue','Date'] else {'OutgoingCount':v} for k,v in dict_element.items()} for dict_element in list_Outgoing]
print(list_Outgoing)
[{'10': {'OutgoingCount': 5}, 'Date': '20190101', 'Venue': 'Beach', '07': {'OutgoingCount': 30}}, {'10': {'OutgoingCount': 15}, 'Date': '20190103', 'Venue': 'Hotel', '07': {'OutgoingCount': 5}}]
Последний шаг: Теперь, когда мы создали реквизит list
из dictionaries
, нам нужно объединить список на основе Venue
и Date
.
from copy import deepcopy
def merge_lists(list_Incoming, list_Outgoing):
# create dictionary from list_Incoming:
dict1 = {(record['Venue'], record['Date']): record for record in list_Incoming}
#compare elements in list_Outgoing to those on list_Incoming:
result = {}
for record in list_Outgoing:
ckey = record['Venue'], record['Date']
new_record = deepcopy(record)
if ckey in dict1:
for key, value in dict1[ckey].items():
if key in ('Venue', 'Date'):
# Do not merge these keys
continue
# Dict's "setdefault" finds a key/value, and if it is missing
# creates a new one with the second parameter as value
new_record.setdefault(key, {}).update(value)
result[ckey] = new_record
# Add values from list_Incoming that were not matched in list_Outgoing:
for key, value in dict1.items():
if key not in result:
result[key] = deepcopy(value)
return list(result.values())
res = merge_lists(list_Incoming, list_Outgoing)
print(res)
[{'10': {'OutgoingCount': 5, 'IncomingCount': 45},
'Date': '20190101',
'Venue': 'Beach',
'08': {'IncomingCount': 93},
'07': {'OutgoingCount': 30}
},
{'10': {'OutgoingCount': 15},
'Date': '20190103',
'Venue': 'Hotel',
'07': {'OutgoingCount': 5}
},
{'10': {'IncomingCount': 3},
'Date': '20190101',
'Venue': 'Hotel',
'08': {'IncomingCount': 15}
}]