Я думаю, что мы должны следовать этому подходу -
- Получить отображение "Street_Name" и "Zip_Code" (отфильтровать нулевой Zip_Code)
- Соедините основной фрейм данных с фреймом данных Zip_Code, используя "Street_Name", и заполните "Почтовый индекс", если он не равен нулю в основном фрейме данных, иначе заполните из нашего фрейма данных Zip_Code.
Попробуйте этот код -
from pyspark.sql.types import *
from pyspark.sql.functions import col
schema = StructType([StructField('BOROUGH', IntegerType(), True),
StructField('Street_Name', StringType(), True),
StructField('Zip_Code', IntegerType(), True)])
data = [(2850662,'BRONX CITY ISLAND ROAD',10464),
(2850740,'BRONX CITY ISLAND ROAD',10464),
(2850749,'BRONX CITY ISLAND ROAD',None),
(2850919,'BRONX CITY ISLAND ROAD',10464),
(3491200,'BRONX CITY ISLAND ROAD',None)]
df = spark.createDataFrame(data,schema)
df_Zip_Code = df.filter(df.Zip_Code.isNotNull()).select('Street_Name','Zip_Code').distinct()
df.alias('a').\
join(df_Zip_Code.alias('b'),col('a.Street_Name') == col('b.Street_Name'), 'inner').\
selectExpr("a.BOROUGH AS BOROUGH","a.Street_Name AS Street_Name","CASE WHEN a.Zip_Code IS NULL THEN b.Zip_Code ELSE a.Zip_Code END AS Zip_Code" ).\
show()