Как вызывать функции с операциями над кадрами из spark sql? - PullRequest
0 голосов
/ 28 декабря 2018

У меня есть sql, в основном это соединение двух таблиц и получение результата admm_sk , если значение admm_sk равно NULL , тогда искровой UDF получит поиск вызова в третьейтаблица, если нет, то получите результат.Как я могу использовать эту функцию в spark sql, так как Spark не позволяет регистрироваться как UDF?

UDF Spark

def GeneratedAccommSk(localHash):
    query = 'select accommodation_sk from staging.accomm_dim where accomm_hash="{}"'.format(localHash)
    accommodationSk_Df=spark.sql(query)
    accomm_count=accommSk_Df.filter(accommSk_Df.accomm_sk.isNotNull()).count()
    if accomm_count != 0:
        accomm_sk=accommSk_Df.select('accomm_sk').collect()[0].asDict()['accomm_sk']
    else:
        func = sc._gateway.jvm.RandomNumberGenerator()
        accom_sk=func.generateRandomNumber().encode('ascii', 'ignore')
    return accom_sk

SQL Spark:

        rate_fact_df=spark.sql("""
*Calling GeneratedAccommSk UDF*
        select  case when accomm_sk IS NOT NULL THEN accommodation_sk 
    ELSE GeneratedAccommSk(a.accommhash) END 
        from 
        staging.contract_test a 
        join 
        dim.accomm_dim b 
        on (a.accomm_hash)= b.accommodation_hash
        """)

1 Ответ

0 голосов
/ 28 декабря 2018

Это не сработает по крайней мере по двум причинам:

В зависимости от размера accommSk_Df вы должны либо собрать его и использовать локальный объект ( Поиск в искровых фреймах данных ), либо выполнить еще одно соединение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...