Как мы можем использовать функцию density_rank () в pyspark? - PullRequest
0 голосов
/ 10 апреля 2020

Я запускаю скрипт pyspark, в котором я выполняю sql запрос и создаю фрейм данных. В запросе sql есть функция dens_rank (). Из-за этого запрос занимает слишком много времени, чтобы выполнить полностью.

Есть ли способ быстро выполнить запрос или мы можем обработать это на уровне pyspark? Доступна ли в pyspark какая-либо функция или метод для замены dens_rank () из sql?

SQL:

SELECT  DENSE_RANK() OVER(ORDER BY SOURCE_COLUMN_VALUE) AS SYSTEM_ID,SYSTEM_TABLE_NAME,SOURCE_ID,SOURCE_NAME,SOURCE_TABLE_NAME,SOURCE_COLUMN_NAME,SRC_VALUE AS SOURCE_COLUMN_VALUE,IM_INSERT_DT FROM (SELECT ID AS SOURCE_ID,'AMPIL' AS SOURCE_NAME,UPPER(concat(coalesce(addr_line_1,''),';',coalesce(addr_line_2,''),';',coalesce(city_1,''),';',coalesce(state_1,''),';',coalesce(zip_1,''),';',coalesce(cntry_1,''))) as  SOURCE_COLUMN_VALUE,concat(coalesce(addr_line1_src,''),';',coalesce(addr_line2_src,''),';',coalesce(city_src,''),';',coalesce(state_crc,''),';',coalesce(zip_1,''),';',coalesce(cntry_1,'')) as SRC_VALUE,SOURCE_TABLE_NAME,'ADDRESS' AS SYSTEM_TABLE_NAME,SOURCE_COLUMN_NAME,date_format(current_timestamp(),'yyyy-MM-dd hh:mm:ss') as IM_INSERT_DT from (SELECT ID,regexp_replace(addr_line_1,' ','') as addr_line_1,Upper(addr_line_1) as addr_line1_src,regexp_replace(addr_line_2,' ','') as addr_line_2 ,upper(addr_line_2) as addr_line2_src,regexp_replace(UPPER(coalesce(city,meli_city_nm)),' ','') as city_1,UPPER(coalesce(city,meli_city_nm)) as city_src,regexp_replace(coalesce(meli_stt_provncd,coalesce(vw_states_code.state_cd,state)),' ','') as state_1, coalesce(meli_stt_provncd,coalesce(vw_states_code.state_cd,state)) as state_crc,case when UPPER(coalesce(vw_states_code.country_cd,country)) = 'US' then 'USA' when UPPER(coalesce(vw_states_code.country_cd,country)) = 'CANADA' then 'CA' else regexp_replace(UPPER(coalesce(vw_states_code.country_cd,country)),' ','') end as cntry_1,case when UPPER(coalesce(vw_states_code.country_cd,country)) = 'US' then regexp_extract(substr(trim(regexp_replace(zip,' ','')),0,5),'^[0-9]{5}$',0) else regexp_replace(zip,' ','') end as zip_1,SOURCE_TABLE_NAME,SOURCE_COLUMN_NAME from vw_addr_stg LEFT JOIN (select * from vw_dmn_meli_zip where MELI_LAST_LN = 'L') vw_dmn_meli  on vw_addr_stg.zip=vw_dmn_meli.meli_zip_cd_base LEFT JOIN vw_states_code on (coalesce(meli_stt_provncd,state) = vw_states_code.state_cd or vw_states_code.state_nm = vw_addr_stg.state) LEFT JOIN vw_country_codes on vw_country_codes.country_name = vw_addr_stg.country))

1 Ответ

0 голосов
/ 10 апреля 2020

В pyspark вы можете использовать комбинацию функций Window и SQL, чтобы получить то, что вы хотите. Я не SQL свободно говорю и не тестировал решение, но что-то подобное может помочь вам:

import pyspark.sql.Window as psw
import pyspark.sql.functions as psf

w = psw.Window.partitionBy("SOURCE_COLUMN_VALUE")
df.withColumn("SYSTEM_ID", dense_rank().over(w))

Вы можете найти do c для dense_rank здесь

...