pySpark повторяет повторяющиеся переменные - PullRequest
1 голос
/ 28 мая 2020

У меня есть код, который в настоящее время работает, однако я хочу сделать его более эффективным и избежать жесткого кодирования:

1) избегайте жесткого кодирования: для NotDefined_filterDomainLookup хотелось бы ссылаться на default_reference df для соответствующего кода и имени, когда Id = 4. Вместо жесткого кодирования значения кода и имени.

Вопрос 1

список имен столбцов и соответствующие им имена новых столбцов

test_matchedAttributeName_List =dict(matchedDomains.agg(collect_set(array('DomainName', 'TargetAttributeForName')).alias('m')).first().m)

Output: {'LeaseType': 'ConformedLeaseTypeName', 'LeaseRecoveryType': 'ConformedLeaseRecoveryTypeName', 'LeaseStatus': 'ConformedLeaseStatusName'}

рабочий код, кроме как избежать жесткого кодирования. В частности, я хотел бы сослаться на default_reference df для соответствующего кода и имени, когда Id = 4

cond = col('PrimaryLookupAttributeName').isNull() & col('SecondaryLookupAttributeName').isNull()

NotDefined_filterDomainLookup = filterDomainLookup \
    .withColumn('OutputItemIdByAttribute', when(cond, lit('4')).otherwise(col('OutputItemIdByAttribute'))) \
    .withColumn('OutputItemCodeByAttribute', when(cond, lit('N/D')).otherwise(col('OutputItemCodeByAttribute'))) \
    .withColumn('OutputItemNameByAttribute', when(cond, lit('Not Defined')).otherwise(col('OutputItemNameByAttribute'))) 

------------ + ----- ------------------ + ------------------------- + ----- -----------

1 Ответ

1 голос
/ 28 мая 2020

Для вопроса-2, на основе вашего кода, я бы посоветовал внести некоторые изменения, как показано ниже:

  • Настройте item_keys , включая Id , Имя и Код и объединение одного и того же logi c с использованием списков
  • Используйте struct вместо array для реализовать вышеуказанный логи c
  • Нет необходимости создавать Python словарь для NotDefned_Attribute_List , список кортежей достаточно и лучше

См. шаги ниже:

(1) Настройте две агрегатные функции для вычисления item_map , используемого для testing_mappings и NotDefined_Attribute_List . проверьте named_struct и struct (два метода для одной и той же задачи в ваших упражнениях)

from itertools import chain
from pyspark.sql.functions import expr, collect_set, struct, col

item_keys = ['Id', 'Name', 'Code']

# use SQL expression
m1_by_sql_expr = expr("""
  collect_set(
    named_struct(
      'attr_name', PrimaryLookupAttributeName,
      'attr_value', PrimaryLookupAttributeValue,
      'Id', OutputItemIdByValue,
      'Name', OutputItemNameByValue,
      'Code', OutputItemCodeByValue
    )
  ) as item_map
""")

# use PySpark API functions
m2_by_func = collect_set(
    struct(
      col('DomainName').alias('domain'),
      col('TargetAttributeForId').alias('Id'),
      col('TargetAttributeForName').alias('Name'),
      col('TargetAttributeForCode').alias('Code')
    )
  ).alias('item_map')

(2) Настройте ItemKey ( Id , Код или Имя ) + PrimaryLookupAttributeName + PrimaryLookupAttributeValue сопоставление с ItemValue

  m1 = NotDefined_filterDomainLookup.agg(m1_by_sql_expr).first().item_map

  """create a list of tuples of (map_key, map_value) to create MapType column:
     | map_key = concat_ws('\0', item_key, attr_name, attr_value)
     | map_value = item_value
  """
  testingId = [('\0'.join([k, row.attr_name, row.attr_value]), row[k]) for row in m1 for k in item_keys if row[k]]
  #[('Id\x00LeaseRecoveryType\x00Gross w/base year', '18'),
  # ('Name\x00LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'),
  # ('Id\x00LeaseStatus\x00Abandoned', '10'),
  # ('Name\x00LeaseStatus\x00Abandoned', 'Active'),
  # ('Id\x00LeaseStatus\x00Draft', '10'),
  # ('Name\x00LeaseStatus\x00Draft', 'Pending'),
  # ('Id\x00LeaseStatus\x00Archive', '11'),
  # ('Name\x00LeaseStatus\x00Archive', 'Expired'),
  # ('Id\x00LeaseStatus\x00Terminated', '10'),
  # ('Name\x00LeaseStatus\x00Terminated', 'Terminated'),
  # ('Id\x00LeaseRecoveryType\x00Gross', '11'),
  # ('Name\x00LeaseRecoveryType\x00Gross', 'Gross'),
  # ('Id\x00LeaseRecoveryType\x00Gross-modified', '15'),
  # ('Name\x00LeaseRecoveryType\x00Gross-modified', 'Modified Gross')]

  # this could be a problem for too many entries.
  testing_mappings = create_map([lit(i) for i in chain.from_iterable(testingId)])

(3) Создать NotDefined_AttributeCode_List (тот же logi c, что и в (2), использовать функции PySpark API для m2)

  m2 = matchedDomains.agg(m2_by_func).first().item_map

  NotDefned_Attribute_List = [(k, row.domain, row[k]) for row in m2 for k in item_keys if row[k]]

(4) Получить список дополнительных столбцов на основе NotDefined_Attribute_List :

  additional_cols = [
    testing_mappings[concat_ws('\0', lit(k), lit(c), col(c))].alias(c_name)
      for k,c,c_name in NotDefined_Attribute_List
  ]

(5) выберите дополнительные столбцы

if count_ND > 0: 

  # move code above in (2), (3) and (4) here

  # set up testing_NotDefined
  testing_NotDefined = datasetMatchedPortfolio.select("*", *additional_cols)

else:
  print("no Not Defines exist")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...