Попробуйте что-то вроде следующего:
from pyspark.sql.functions import lit, col, when
cond = col('PrimaryLookupAttributeName').isNull() & col('SecondaryLookupAttributeName').isNull()
# or in your example:
# (col("PrimaryLookupAttributeName") == "") & (col("SecondaryLookupAttributeName") == "")
df1 = filterDomainLookup \
.withColumn('OutputItemIdByValue', when(cond, lit('4')).otherwise(col('OutputItemIdByValue'))) \
.withColumn('OutputItemCodeByValue', when(cond, lit('N/D')).otherwise(col('OutputItemCodeByValue'))) \
.withColumn('OutputItemNameByValue', when(cond, lit('Not Defined')).otherwise(col('OutputItemNameByValue')))
df1.select('DomainName', 'PrimaryLookupAttributeName', *[c for c in filterDomainLookup.columns if c.startswith('Out')]).show()
#+-----------------+--------------------------+-------------------+---------------------+---------------------+
#| DomainName|PrimaryLookupAttributeName|OutputItemIdByValue|OutputItemCodeByValue|OutputItemNameByValue|
#+-----------------+--------------------------+-------------------+---------------------+---------------------+
#|LeaseRecoveryType| null| 4| N/D| Not Defined|
#|LeaseRecoveryType| LeaseRecoveryType| 11| null| Gross|
#|LeaseRecoveryType| LeaseRecoveryType| 18| null| Modified Gross|
#| LeaseStatus| LeaseStatus| 10| null| Active|
#| LeaseStatus| LeaseStatus| 10| null| Terminated|
#| LeaseStatus| LeaseStatus| 11| null| Expired|
#| LeaseStatus| LeaseStatus| 10| null| Pending|
#+-----------------+--------------------------+-------------------+---------------------+---------------------+
Обновление: как описано в другом сообщении, когда ItemID, ItemCode и ItemName извлекаются из default_reference, создайте сопоставление между ItemId и ItemName и ItemCode:
from pyspark.sql.functions import collect_set, array
default_reference.show()
+--------+----------+------+--------+--------------------+
|DomainId|DomainName|ItemId|ItemCode| ItemName|
+--------+----------+------+--------+--------------------+
| 0| Default| 0| N/S|Not Specified at ...|
| 0| Default| 1| N/F| Lookup Not Found|
| 0| Default| 2| N/I|Not Implemented i...|
| 0| Default| 3| N/A| Not Applicable|
| 0| Default| 4| N/D| Not Defined|
| 0| Default| 5| O/R| Consumer Override|
| 0| Default| 6| SCM|Standard Column M...|
+--------+----------+------+--------+--------------------+
map_ids = dict((int(e[0]), {'code':e[1],'name':e[2]}) for e in default_reference.agg(collect_set(array('ItemId','ItemCode','ItemName')).alias('id')).first().id)
#map_ids
#{'6': {'code': 'SCM', 'name': 'Standard Column M...'},
# '2': {'code': 'N/I', 'name': 'Not Implemented i...'},
# '4': {'code': 'N/D', 'name': 'Not Defined'},
# '3': {'code': 'N/A', 'name': 'Not Applicable'},
# '1': {'code': 'N/F', 'name': 'Lookup Not Found'},
# '0': {'code': 'N/S', 'name': 'Not Specified at ...'},
# '5': {'code': 'O/R', 'name': 'Consumer Override'}}
cond = col('PrimaryLookupAttributeName').isNull() & col('SecondaryLookupAttributeName').isNull()
item_id = 4
item = map_ids[item_id]
NotDefined_filterDomainLookup = filterDomainLookup \
.filter(cond) \
.withColumn('OutputItemIdByValue', lit(item_id)) \
.withColumn('OutputItemCodeByValue', lit(item['code'])) \
.withColumn('OutputItemNameByValue', lit(item['name']))