IIUC, вы можете запустить flatMap (), используя понимание списка, чтобы перебрать 2-й элемент из 4-элементных кортежей (1 строка + 3 списка), например:
from pyspark.sql import Row
myrdd = sc.parallelize(rdd)
myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ).collect()
#[({'primary_id': 'xxxxx99'},
# {'cov_id': 'Q', 'cov_cd': '100', 'cov_amt': '100', 'cov_state': 'AZ'},
# {'pol_cat_id': '234', 'pol_dt': '20100220'},
# {'qor_pol_id': '23492', 'qor_cd': '30'}),
# ......
Краткое объяснение: в понимании списка функции flatMap, помимо итерации 2-го элемента x[1]
(как z
, который является словарем), я также преобразовал первый элемент String x [0] в словарь с одной записью: {"primary_id":x[0]}
и возьмите первый элемент из x [2] и x [3] , оба из которых являются словарями.
Таким образом, после запуска вышеуказанной функции flatMap, элемент RDD становится кортежем из 4 словарей, и вам нужно просто объединить их.ниже приведен пример кода для сопоставления кортежа из 4-х словарей в объекте Row. Возможно, вам придется изменить логику обработки исключений и пропущенных полей в соответствии с вашими требованиями.
cols = ['primary_id', 'cov_id', 'cov_cd', 'cov_amt', 'cov_state', 'pol_cat_id', 'pol_dt', 'qor_pol_id', 'qor_cd']
def merge_dict(arr, cols):
row = {}
try:
for e in arr:
if type(e) is dict: row.update(e)
except:
pass
finally:
return Row(**dict({ c:row.get(c, None) for c in cols })) if row else None
myrdd.flatMap(lambda x: [ ({'primary_id':x[0]}, z, x[2][0], x[3][0]) for z in x[1] ] ) \
.map(lambda x: merge_dict(x, cols)) \
.filter(bool) \
.toDF() \
.show()
+-------+------+------+---------+----------+--------+----------+------+----------+
|cov_amt|cov_cd|cov_id|cov_state|pol_cat_id| pol_dt|primary_id|qor_cd|qor_pol_id|
+-------+------+------+---------+----------+--------+----------+------+----------+
| 100| 100| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 200| 33| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 10| 64| Q| AZ| 234|20100220| xxxxx99| 30| 23492|
| 100| 20| R| TX| 532|20091020| xxxxx86| 21| 49320|
| 500| 44| R| TX| 532|20091020| xxxxx86| 21| 49320|
| 50| 66| R| TX| 532|20091020| xxxxx86| 21| 49320|
+-------+------+------+---------+----------+--------+----------+------+----------+
Кстати. Если вы хотите, чтобы ваша исходная функция работала, проверьте следующие 5 строк, содержащие #<--
:
def flatten_map(record):
try:
#yield(record) #<-- comment this out, no need unprocessed data in output
# Unpack items
id, items, line, pls = record
pol_id = line[0]["pol_cat_id"] #<-- from line[0] not pls
pol_dt = line[0]["pol_dt"] #<-- from line[0] not pls
qor_id = pls[0]["qor_pol_id"] #<-- from pls[0] not pls
for item in items:
#<-- below line removed the ending ", 1", thus no need the last map() function to flatten tuples
yield (id,item["cov_id"],item["cov_cd"], item["cov_amt"], item["cov_state"], pol_id, pol_dt, qor_id)
except Exception as e:
pass