Вы сделали почти правильно, просто вам нужно использовать mapping_expr
в последовательности.
from pyspark.sql.functions import col, create_map, lit, when
from itertools import chain
values = [('s1','c1','p3'),('s2','c1','p3'),('s1','c3','p3'),('s3','c4','p4'),('s4','c5','p4'),('s2','c6','p4')]
df = sqlContext.createDataFrame(values,['col1','col2','col3'])
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| s1| c1| p3|
| s2| c1| p3|
| s1| c3| p3|
| s3| c4| p4|
| s4| c5| p4|
| s2| c6| p4|
+----+----+----+
Словарь, предоставленный вами и создающий его отображение
col1_map = {'s1' : 'apple', 's3' : 'orange'}
col2_map = {'c1' : 'potato', 'c6' : 'tomato'}
col3_map = {'p3' : 'ball', 'p4' : 'bat'}
#Applying the mapping of dictionary.
mapping_expr1 = create_map([lit(x) for x in chain(*col1_map.items())])
mapping_expr2 = create_map([lit(x) for x in chain(*col2_map.items())])
mapping_expr3 = create_map([lit(x) for x in chain(*col3_map.items())])
Наконец, применениеcreate_map()
подряд. Кроме того, все, что я делаю, это проверяю, есть ли после операции на col1/col2
все еще нулевой результат, который можно проверить с помощью функции isNull()
.
df=df.withColumn('col4', mapping_expr1.getItem(col('col1')))
df=df.withColumn('col4', when(col('col4').isNull(),mapping_expr2.getItem(col('col2'))).otherwise(col('col4')))
df=df.withColumn('col4', when(col('col4').isNull(),mapping_expr3.getItem(col('col3'))).otherwise(col('col4')))
df.show()
+----+----+----+------+
|col1|col2|col3| col4|
+----+----+----+------+
| s1| c1| p3| apple|
| s2| c1| p3|potato|
| s1| c3| p3| apple|
| s3| c4| p4|orange|
| s4| c5| p4| bat|
| s2| c6| p4|tomato|
+----+----+----+------+