Для вопроса-2, на основе вашего кода, я бы посоветовал внести некоторые изменения, как показано ниже:
- Настройте item_keys , включая Id , Имя и Код и объединение одного и того же logi c с использованием списков
- Используйте struct вместо array для реализовать вышеуказанный логи c
- Нет необходимости создавать Python словарь для NotDefned_Attribute_List , список кортежей достаточно и лучше
См. шаги ниже:
(1) Настройте две агрегатные функции для вычисления item_map , используемого для testing_mappings и NotDefined_Attribute_List . проверьте named_struct и struct (два метода для одной и той же задачи в ваших упражнениях)
from itertools import chain
from pyspark.sql.functions import expr, collect_set, struct, col
item_keys = ['Id', 'Name', 'Code']
# use SQL expression
m1_by_sql_expr = expr("""
collect_set(
named_struct(
'attr_name', PrimaryLookupAttributeName,
'attr_value', PrimaryLookupAttributeValue,
'Id', OutputItemIdByValue,
'Name', OutputItemNameByValue,
'Code', OutputItemCodeByValue
)
) as item_map
""")
# use PySpark API functions
m2_by_func = collect_set(
struct(
col('DomainName').alias('domain'),
col('TargetAttributeForId').alias('Id'),
col('TargetAttributeForName').alias('Name'),
col('TargetAttributeForCode').alias('Code')
)
).alias('item_map')
(2) Настройте ItemKey ( Id , Код или Имя ) + PrimaryLookupAttributeName + PrimaryLookupAttributeValue сопоставление с ItemValue
m1 = NotDefined_filterDomainLookup.agg(m1_by_sql_expr).first().item_map
"""create a list of tuples of (map_key, map_value) to create MapType column:
| map_key = concat_ws('\0', item_key, attr_name, attr_value)
| map_value = item_value
"""
testingId = [('\0'.join([k, row.attr_name, row.attr_value]), row[k]) for row in m1 for k in item_keys if row[k]]
#[('Id\x00LeaseRecoveryType\x00Gross w/base year', '18'),
# ('Name\x00LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'),
# ('Id\x00LeaseStatus\x00Abandoned', '10'),
# ('Name\x00LeaseStatus\x00Abandoned', 'Active'),
# ('Id\x00LeaseStatus\x00Draft', '10'),
# ('Name\x00LeaseStatus\x00Draft', 'Pending'),
# ('Id\x00LeaseStatus\x00Archive', '11'),
# ('Name\x00LeaseStatus\x00Archive', 'Expired'),
# ('Id\x00LeaseStatus\x00Terminated', '10'),
# ('Name\x00LeaseStatus\x00Terminated', 'Terminated'),
# ('Id\x00LeaseRecoveryType\x00Gross', '11'),
# ('Name\x00LeaseRecoveryType\x00Gross', 'Gross'),
# ('Id\x00LeaseRecoveryType\x00Gross-modified', '15'),
# ('Name\x00LeaseRecoveryType\x00Gross-modified', 'Modified Gross')]
# this could be a problem for too many entries.
testing_mappings = create_map([lit(i) for i in chain.from_iterable(testingId)])
(3) Создать NotDefined_AttributeCode_List (тот же logi c, что и в (2), использовать функции PySpark API для m2)
m2 = matchedDomains.agg(m2_by_func).first().item_map
NotDefned_Attribute_List = [(k, row.domain, row[k]) for row in m2 for k in item_keys if row[k]]
(4) Получить список дополнительных столбцов на основе NotDefined_Attribute_List :
additional_cols = [
testing_mappings[concat_ws('\0', lit(k), lit(c), col(c))].alias(c_name)
for k,c,c_name in NotDefined_Attribute_List
]
(5) выберите дополнительные столбцы
if count_ND > 0:
# move code above in (2), (3) and (4) here
# set up testing_NotDefined
testing_NotDefined = datasetMatchedPortfolio.select("*", *additional_cols)
else:
print("no Not Defines exist")