У меня проблемы при попытке обновить существующую таблицу Kudu через соединение jdb c. Указанная таблица используется как в качестве источника, так и цели, и я пытаюсь обновить несколько записей.
Мой процесс выглядит следующим образом:
Я использую соединение jdb c для чтения данных из входной таблицы Куду с именем db1.test_cls1
: таблица имеет следующие атрибуты :
consumer_id object
merchant_id object
log_date datetime64[ns]
beginning_period datetime64[ns]
end_period datetime64[ns]
first_trx_date datetime64[ns]
last_trx_date datetime64[ns]
tse float64
rec int32
freq int32
ag int32
mv float64
p_mo float64
p_al float64
p_freq float64
Jdb c соединение для чтения данных из таблицы: sourceDF = spark.read.jdbc(url=url, table=inputTable, properties=properties)
! Примечание - атрибуты с именем p_mo, p_al and p_freq
изначально пусты и обновлять после расчета в пределах Python кода
Следуя этому требованию, я преобразовываю свой фрейм данных Spark в фрейм данных Pandas и вычисляю атрибуты
p_mo, p_al and p_freq
.
sourceDF.createTempView("sourceDFView")
sql = """
SELECT consumer_id
, merchant_id
, log_date
, beginning_period
, end_period
, first_trx_date
, last_trx_date
, tse
, rec
, freq
, ag
, mv
FROM sourceDFView """
inputDF = sqlContext.sql(sql)
inputDF.dtypes
inputDF = inputDF.toPandas().reset_index()
def get_p_mo():
...code for calculation...
return p_mo
def get_p_al():
...code for calculation...
return p_al
def get_p_freq():
...code for calculation...
return p_freq
inputDF['p_mo'] = get_p_mo()
inputDF['p_al'] = get_p_al()
inputDF['p_freq'] = get_p_freq()
После завершения этой части я снова преобразовываю Pandas df в Spark df, чтобы иметь возможность обновлять таблицу
db1.test_cls1
через соединение jdb c. Это делается следующим образом:
tarDF: DataFrame = inputDF[[
'consumer_id',
'merchant_id',
'log_date',
'beginning_period',
'end_period',
'first_trx_date',
'last_trx_date',
'tse',
'rec',
'freq',
'ag',
'mv',
'p_mo',
'p_al',
'p_freq']]
inputDF_Prob = spark.createDataFrame(tarDF, schema=schema)
inputDF_Prob.createTempView("targetDF")
sqlTarget = """
UPDATE
sourceDFView
set
sourceDFView.p_al= targetDF.p_al
, sourceDFView.p_freq = targetDF.p_freq
, sourceDFView.p_mo = targetDF.p_mo
from
targetDF
inner join
sourceDFView
on
sourceDFView.consumer_id = targetDF.consumer_id
and
sourceDFView.merchant_id = targetDF.merchant_id
and
sourceDFView.log_date = targetDF.log_date
and
sourceDFView.beginning_period = targetDF.beginning_period
and
sourceDFView.end_period = targetDF.end_period
"""
targetDF = sqlContext.sql(sqlTarget)
targetDF.write \
.mode("append") \
.jdbc(url=url, table=inputTable,
properties=properties)
И это приводит к следующей ошибке:
py4j.protocol.Py4JJavaError: An error occurred while calling o32.sql.
: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'UPDATE' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 2, pos 8)
! Примечание: Я также пытался сделать обновление прямо в редакторе Hue, и он работал отлично.
За исключением этого, я пытался сохранить данные в другой целевой таблице, используя следующую часть кода, и она также работала нормально:
sqlTarget = """
SELECT
cast(consumer_id as string) consumer_id
, cast(merchant_id as string) merchant_id
, cast(log_date as timestamp) log_date
, cast(beginning_period as timestamp) beginning_period
, cast(end_period as timestamp) end_period
, cast(p_mo as double) p_mo
, cast(p_al as double) p_al
, cast(p_freq as double) p_freq
FROM targetDF
"""
targetDF.write \
.mode("append") \
.jdbc(url=url, table=targetTable,
properties=properties)
Может кто-нибудь помочь в этом вопросе как обновить таблицу Kudu через соединение JDB c?