У меня есть sql, в основном это соединение двух таблиц и получение результата admm_sk , если значение admm_sk равно NULL , тогда искровой UDF получит поиск вызова в третьейтаблица, если нет, то получите результат.Как я могу использовать эту функцию в spark sql, так как Spark не позволяет регистрироваться как UDF?
UDF Spark
def GeneratedAccommSk(localHash):
query = 'select accommodation_sk from staging.accomm_dim where accomm_hash="{}"'.format(localHash)
accommodationSk_Df=spark.sql(query)
accomm_count=accommSk_Df.filter(accommSk_Df.accomm_sk.isNotNull()).count()
if accomm_count != 0:
accomm_sk=accommSk_Df.select('accomm_sk').collect()[0].asDict()['accomm_sk']
else:
func = sc._gateway.jvm.RandomNumberGenerator()
accom_sk=func.generateRandomNumber().encode('ascii', 'ignore')
return accom_sk
SQL Spark:
rate_fact_df=spark.sql("""
*Calling GeneratedAccommSk UDF*
select case when accomm_sk IS NOT NULL THEN accommodation_sk
ELSE GeneratedAccommSk(a.accommhash) END
from
staging.contract_test a
join
dim.accomm_dim b
on (a.accomm_hash)= b.accommodation_hash
""")