Использование словаря в функции regexp_replace в pyspark - PullRequest
0 голосов
/ 08 мая 2018

Я хочу выполнить операцию regexp_replace в столбце фрейма данных pyspark, используя словарь.

Словарь: {'RD':'ROAD','DR':'DRIVE','AVE':'AVENUE',....} В словаре будет около 270 пар ключей-значений.

Входные данные:

ID  | Address    
1   | 22, COLLINS RD     
2   | 11, HEMINGWAY DR    
3   | AVIATOR BUILDING    
4   | 33, PARK AVE MULLOHAND DR

желаемый выходной кадр данных:

ID   | Address  | Address_Clean    
1    | 22, COLLINS RD    | 22, COLLINS ROAD    
2    | 11, HEMINGWAY DR     | 11, HEMINGWAY DRIVE    
3    | AVIATOR BUILDING      | AVIATOR BUILDING    
4    | 33, PARK AVE MULLOHAND DR    | 33, PARK AVENUE MULLOHAND DRIVE

Я не могу найти какую-либо документацию в интернете. И если вы пытаетесь передать словарь, как показано ниже кодов -

data=data.withColumn('Address_Clean',regexp_replace('Address',dict))

Выдает ошибку «regexp_replace принимает 3 аргумента, 2 дано».

Набор данных будет иметь размер около 20 миллионов. Следовательно, решение UDF будет медленным (из-за строковой операции), и у нас нет доступа к версии 2.3.0 spark, которая поддерживает pandas_udf. Есть ли эффективный способ сделать это, кроме как использование цикла?

1 Ответ

0 голосов
/ 16 апреля 2019

Выдает вам эту ошибку, потому что regexp_replace () нуждается в трех аргументах:

regexp_replace('column_to_change','pattern_to_be_changed','new_pattern')

Но вы правы, вам не нужен UDF или цикл. Вам просто нужно еще несколько регулярных выражений и таблица каталогов, которая выглядит точно так же, как ваш исходный каталог :)

Вот мое решение для этого:

# You need to get rid of all the things you want to replace. 
# You can use the OR (|) operator for that. 
# You could probably automate that and pass it a string that looks like that instead but I will leave that for you to decide.

input_df = input_df.withColumn('start_address', sf.regexp_replace("original_address","RD|DR|etc...",""))


# You will still need the old ends in a separate column
# This way you have something to join on your directory table.

input_df = input_df.withColumn('end_of_address',sf.regexp_extract('original_address',"(.*) (.*)", 2))


# Now we join the directory table that has two columns - ends you want to replace and ends you want to have instead.

input_df = directory_df.join(input_df,'end_of_address')


# And now you just need to concatenate the address with the correct ending.

input_df = input_df.withColumn('address_clean',sf.concat('start_address','correct_end'))
...