PySpark - как обновить таблицу Kudu, используя соединение jdb c? - PullRequest
1 голос
/ 11 февраля 2020

У меня проблемы при попытке обновить существующую таблицу Kudu через соединение jdb c. Указанная таблица используется как в качестве источника, так и цели, и я пытаюсь обновить несколько записей.

Мой процесс выглядит следующим образом:

  1. Я использую соединение jdb c для чтения данных из входной таблицы Куду с именем db1.test_cls1: таблица имеет следующие атрибуты :

    consumer_id                   object
    merchant_id                   object
    log_date              datetime64[ns]
    beginning_period      datetime64[ns]
    end_period            datetime64[ns]
    first_trx_date        datetime64[ns]
    last_trx_date         datetime64[ns]
    tse                          float64
    rec                            int32
    freq                           int32
    ag                             int32
    mv                           float64
    p_mo                         float64
    p_al                         float64
    p_freq                       float64
    

Jdb c соединение для чтения данных из таблицы: sourceDF = spark.read.jdbc(url=url, table=inputTable, properties=properties)

! Примечание - атрибуты с именем p_mo, p_al and p_freq изначально пусты и обновлять после расчета в пределах Python кода

Следуя этому требованию, я преобразовываю свой фрейм данных Spark в фрейм данных Pandas и вычисляю атрибуты p_mo, p_al and p_freq.

sourceDF.createTempView("sourceDFView")

sql = """
    SELECT consumer_id
          , merchant_id
          , log_date
          , beginning_period
          , end_period
          , first_trx_date
          , last_trx_date
          , tse
          , rec
          , freq
          , ag
          , mv
        FROM sourceDFView """

inputDF = sqlContext.sql(sql)
inputDF.dtypes

inputDF = inputDF.toPandas().reset_index()


def get_p_mo():
    ...code for calculation...
    return p_mo

def get_p_al():
    ...code for calculation...
    return p_al

def get_p_freq():
    ...code for calculation...
    return p_freq

inputDF['p_mo'] = get_p_mo()
inputDF['p_al'] = get_p_al()
inputDF['p_freq'] = get_p_freq()
После завершения этой части я снова преобразовываю Pandas df в Spark df, чтобы иметь возможность обновлять таблицу db1.test_cls1 через соединение jdb c. Это делается следующим образом:

tarDF: DataFrame = inputDF[[ 'consumer_id', 'merchant_id', 'log_date', 'beginning_period', 'end_period', 'first_trx_date', 'last_trx_date', 'tse', 'rec', 'freq', 'ag', 'mv', 'p_mo', 'p_al', 'p_freq']]

inputDF_Prob = spark.createDataFrame(tarDF, schema=schema)

inputDF_Prob.createTempView("targetDF")

sqlTarget = """
        UPDATE 
            sourceDFView
        set 
            sourceDFView.p_al= targetDF.p_al
            , sourceDFView.p_freq = targetDF.p_freq
            , sourceDFView.p_mo = targetDF.p_mo
        from
            targetDF
        inner join 
            sourceDFView
        on 
            sourceDFView.consumer_id = targetDF.consumer_id
        and 
            sourceDFView.merchant_id = targetDF.merchant_id
        and 
            sourceDFView.log_date = targetDF.log_date
        and 
            sourceDFView.beginning_period = targetDF.beginning_period
        and 
            sourceDFView.end_period = targetDF.end_period
    """

targetDF = sqlContext.sql(sqlTarget)

targetDF.write \
    .mode("append") \
    .jdbc(url=url, table=inputTable,
          properties=properties)

И это приводит к следующей ошибке:

py4j.protocol.Py4JJavaError: An error occurred while calling o32.sql.
: org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'UPDATE' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 2, pos 8)

! Примечание: Я также пытался сделать обновление прямо в редакторе Hue, и он работал отлично.

За исключением этого, я пытался сохранить данные в другой целевой таблице, используя следующую часть кода, и она также работала нормально:

sqlTarget = """
    SELECT  
          cast(consumer_id as string) consumer_id
        , cast(merchant_id as string) merchant_id
        , cast(log_date as timestamp) log_date
        , cast(beginning_period as timestamp) beginning_period
        , cast(end_period as timestamp) end_period
        , cast(p_mo as double) p_mo
        , cast(p_al as double) p_al
        , cast(p_freq as double) p_freq
    FROM targetDF  
    """
targetDF.write \
    .mode("append") \
    .jdbc(url=url, table=targetTable,
          properties=properties)

Может кто-нибудь помочь в этом вопросе как обновить таблицу Kudu через соединение JDB c?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...