pySpark dict (ключ, значение, значение) - PullRequest
0 голосов
/ 27 мая 2020

Ошибка PySpark при использовании when с фреймами данных

попытка pySpark

test = filterDomainLookup.when((col("PrimaryLookupAttributeName") == "") & (col("SecondaryLookupAttributeName") == ""), "Not Defined")
display(test)


AttributeError: 'DataFrame' object has no attribute 'when'

1 Ответ

2 голосов
/ 27 мая 2020

Попробуйте что-то вроде следующего:

from pyspark.sql.functions import lit, col, when

cond = col('PrimaryLookupAttributeName').isNull() & col('SecondaryLookupAttributeName').isNull()
# or in your example:
# (col("PrimaryLookupAttributeName") == "") & (col("SecondaryLookupAttributeName") == "")

df1 = filterDomainLookup \
    .withColumn('OutputItemIdByValue', when(cond, lit('4')).otherwise(col('OutputItemIdByValue'))) \
    .withColumn('OutputItemCodeByValue', when(cond, lit('N/D')).otherwise(col('OutputItemCodeByValue'))) \
    .withColumn('OutputItemNameByValue', when(cond, lit('Not Defined')).otherwise(col('OutputItemNameByValue'))) 

df1.select('DomainName', 'PrimaryLookupAttributeName', *[c for c in filterDomainLookup.columns if c.startswith('Out')]).show()
#+-----------------+--------------------------+-------------------+---------------------+---------------------+
#|       DomainName|PrimaryLookupAttributeName|OutputItemIdByValue|OutputItemCodeByValue|OutputItemNameByValue|
#+-----------------+--------------------------+-------------------+---------------------+---------------------+
#|LeaseRecoveryType|                      null|                  4|                  N/D|          Not Defined|
#|LeaseRecoveryType|         LeaseRecoveryType|                 11|                 null|                Gross|
#|LeaseRecoveryType|         LeaseRecoveryType|                 18|                 null|       Modified Gross|
#|      LeaseStatus|               LeaseStatus|                 10|                 null|               Active|
#|      LeaseStatus|               LeaseStatus|                 10|                 null|           Terminated|
#|      LeaseStatus|               LeaseStatus|                 11|                 null|              Expired|
#|      LeaseStatus|               LeaseStatus|                 10|                 null|              Pending|
#+-----------------+--------------------------+-------------------+---------------------+---------------------+

Обновление: как описано в другом сообщении, когда ItemID, ItemCode и ItemName извлекаются из default_reference, создайте сопоставление между ItemId и ItemName и ItemCode:

from pyspark.sql.functions import collect_set, array

default_reference.show()                                                                                           
+--------+----------+------+--------+--------------------+
|DomainId|DomainName|ItemId|ItemCode|            ItemName|
+--------+----------+------+--------+--------------------+
|       0|   Default|     0|     N/S|Not Specified at ...|
|       0|   Default|     1|     N/F|    Lookup Not Found|
|       0|   Default|     2|     N/I|Not Implemented i...|
|       0|   Default|     3|     N/A|      Not Applicable|
|       0|   Default|     4|     N/D|         Not Defined|
|       0|   Default|     5|     O/R|   Consumer Override|
|       0|   Default|     6|     SCM|Standard Column M...|
+--------+----------+------+--------+--------------------+

map_ids = dict((int(e[0]), {'code':e[1],'name':e[2]}) for e in default_reference.agg(collect_set(array('ItemId','ItemCode','ItemName')).alias('id')).first().id) 
#map_ids
#{'6': {'code': 'SCM', 'name': 'Standard Column M...'},
# '2': {'code': 'N/I', 'name': 'Not Implemented i...'},
# '4': {'code': 'N/D', 'name': 'Not Defined'},
# '3': {'code': 'N/A', 'name': 'Not Applicable'},
# '1': {'code': 'N/F', 'name': 'Lookup Not Found'},
# '0': {'code': 'N/S', 'name': 'Not Specified at ...'},
# '5': {'code': 'O/R', 'name': 'Consumer Override'}}

cond = col('PrimaryLookupAttributeName').isNull() & col('SecondaryLookupAttributeName').isNull()

item_id = 4
item = map_ids[item_id]

NotDefined_filterDomainLookup = filterDomainLookup \
    .filter(cond) \
    .withColumn('OutputItemIdByValue', lit(item_id)) \
    .withColumn('OutputItemCodeByValue', lit(item['code'])) \
    .withColumn('OutputItemNameByValue', lit(item['name']))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...