Есть два DataFrames
-
DataFrame 1: DataFrame
, содержащий полный адрес.
DataFrame 2: DataFrame
, содержащий базовые данные - Postcode
, District
& City / Town / Suburb
.
Цель проблемы - извлечь suburb
для DataFrame 1
из DataFrame 2
. Хотя OP явно не указал key
, в котором мы можем объединить два DataFrames, но Postcode
только кажется разумным выбором.
# Importing requisite functions
from pyspark.sql.functions import col,regexp_extract,split,udf
from pyspark.sql.types import StringType
Давайте создадим DataFrame 1
как df
. В этом DataFrame
нам нужно извлечь Postcode
. В Австралии все почтовые индексы 4-значный длиной , поэтому мы используем regexp_extract () для извлечения 4-значного числа из столбца string
.
df = sqlContext.createDataFrame([('BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia ',),
('HAY STREET HAYMARKET 2000, NSW, Australia',),
('SMART STREET FAIRFIELD 2165, NSW, Australia',),
('CLARENCE STREET SYDNEY 2000, NSW, Australia',)],
('FullAddress',))
df = df.withColumn('Postcode', regexp_extract('FullAddress', "(\\d{4})" , 1 ))
df.show(truncate=False)
+---------------------------------------------+--------+
|FullAddress |Postcode|
+---------------------------------------------+--------+
|BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |2113 |
|HAY STREET HAYMARKET 2000, NSW, Australia |2000 |
|SMART STREET FAIRFIELD 2165, NSW, Australia |2165 |
|CLARENCE STREET SYDNEY 2000, NSW, Australia |2000 |
+---------------------------------------------+--------+
Теперь, когда мы извлекли Postcode
, мы создали key
, чтобы соединить два DataFrames
. Давайте создадим DataFrame 2
, из которого нам нужно извлечь соответствующие suburb
.
df_City_Town_Suburb = sqlContext.createDataFrame([(2000,'Sydney','Dawes Point, Haymarket, Millers Point, Sydney, The Rocks'),
(2001,'Sydney','Sydney'),(2113,'Sydney','North Ryde')],
('Postcode','District','City_Town_Suburb'))
df_City_Town_Suburb.show(truncate=False)
+--------+--------+--------------------------------------------------------+
|Postcode|District|City_Town_Suburb |
+--------+--------+--------------------------------------------------------+
|2000 |Sydney |Dawes Point, Haymarket, Millers Point, Sydney, The Rocks|
|2001 |Sydney |Sydney |
|2113 |Sydney |North Ryde |
+--------+--------+--------------------------------------------------------+
Соединение двух DataFrames
с left
объединением -
df = df.join(df_City_Town_Suburb.select('Postcode','City_Town_Suburb'), ['Postcode'],how='left')
df.show(truncate=False)
+--------+---------------------------------------------+--------------------------------------------------------+
|Postcode|FullAddress |City_Town_Suburb |
+--------+---------------------------------------------+--------------------------------------------------------+
|2113 |BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |North Ryde |
|2165 |SMART STREET FAIRFIELD 2165, NSW, Australia |null |
|2000 |HAY STREET HAYMARKET 2000, NSW, Australia |Dawes Point, Haymarket, Millers Point, Sydney, The Rocks|
|2000 |CLARENCE STREET SYDNEY 2000, NSW, Australia |Dawes Point, Haymarket, Millers Point, Sydney, The Rocks|
+--------+---------------------------------------------+--------------------------------------------------------+
Разделение столбца City_Town_Suburb
на массив с использованием функции split () -
df = df.select('Postcode','FullAddress',split(col("City_Town_Suburb"), ",\s*").alias("City_Town_Suburb"))
df.show(truncate=False)
+--------+---------------------------------------------+----------------------------------------------------------+
|Postcode|FullAddress |City_Town_Suburb |
+--------+---------------------------------------------+----------------------------------------------------------+
|2113 |BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |[North Ryde] |
|2165 |SMART STREET FAIRFIELD 2165, NSW, Australia |null |
|2000 |HAY STREET HAYMARKET 2000, NSW, Australia |[Dawes Point, Haymarket, Millers Point, Sydney, The Rocks]|
|2000 |CLARENCE STREET SYDNEY 2000, NSW, Australia |[Dawes Point, Haymarket, Millers Point, Sydney, The Rocks]|
+--------+---------------------------------------------+----------------------------------------------------------+
Наконец, создаем UDF , чтобы проверить каждый элемент массива City_Town_Suburb
, существует ли он в столбце FullAddress
. Если существует единица, мы немедленно ее возвращаем, иначе возвращается None
.
def suburb(FullAddress,City_Town_Suburb):
# Check for the case where there is no Array, otherwise we will get an Error
if City_Town_Suburb == None:
return None
# Checking each and every Array element if it exists in 'FullAddress',
# and if a match is found, it's immediately returned.
for sub in City_Town_Suburb:
if sub.strip().upper() in FullAddress:
return sub.upper()
return None
suburb_udf = udf(suburb,StringType())
Применение этого UDF
-
df = df.withColumn('suburb', suburb_udf(col('FullAddress'),col('City_Town_Suburb'))).drop('City_Town_Suburb')
df.show(truncate=False)
+--------+---------------------------------------------+----------+
|Postcode|FullAddress |suburb |
+--------+---------------------------------------------+----------+
|2113 |BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |NORTH RYDE|
|2165 |SMART STREET FAIRFIELD 2165, NSW, Australia |null |
|2000 |HAY STREET HAYMARKET 2000, NSW, Australia |HAYMARKET |
|2000 |CLARENCE STREET SYDNEY 2000, NSW, Australia |SYDNEY |
+--------+---------------------------------------------+----------+