Я пытаюсь загрузить данные из таблицы RDBMS в таблице Postgres to Hive в HDFS.
val yearDF = spark.read.format("jdbc").option("url", connectionUrl)
.option("dbtable", s"(${query}) as year2017")
.option("user", devUserName).option("password", devPassword)
.option("numPartitions",15).load()
Таблица Hive динамически разделена на две колонки: source_system_name,period_year
У меня есть имена столбцов в таблице метаданных: metatables
val spColsDF = spark.read.format("jdbc").option("url",hiveMetaConURL)
.option("dbtable", "(select partition_columns from metainfo.metatables where tablename='finance.xx_gl_forecast') as colsPrecision")
.option("user", metaUserName)
.option("password", metaPassword)
.load()
Я пытаюсь переместить столбцы раздела: source_system_name, period_year
в конец dataFrame: yearDF
, поскольку столбцы, используемые в динамическом разделении Hive, должны находиться в конце.
Для этого я придумал следующую логику:
val partition_columns = spColsDF.select("partition_columns").collect().map(_.getString(0)).toSeq
val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columns
val allCols = allColsOrdered.map(coln => org.apache.spark.sql.functions.col(coln))
val resultDF = yearDF.select(allCols:_*)
Когда я выполняю код, я получаю исключение: org.apache.spark.sql.AnalysisException
, как показано ниже:
Exception in thread "main" 18/08/28 18:09:30 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
org.apache.spark.sql.AnalysisException: cannot resolve '`source_system_name,period_year`' given input columns: [cost_center, period_num, period_name, currencies, cc_channel, scenario, xx_pk_id, period_year, cc_region, reference_code, source_system_name, source_record_type, xx_last_update_tms, xx_last_update_log_id, book_type, cc_function, product_line, ptd_balance_text, project, ledger_id, currency_code, xx_data_hash_id, qtd_balance_text, pl_market, version, qtd_balance, period, ptd_balance, ytd_balance_text, xx_hvr_last_upd_tms, geography, year, del_flag, trading_partner, ytd_balance, xx_data_hash_code, xx_creation_tms, forecast_id, drm_org, account, business_unit, gl_source_name, gl_source_system_name];;
'Project [forecast_id#26L, period_year#27, period_num#28, period_name#29, drm_org#30, ledger_id#31L, currency_code#32, source_system_name#33, source_record_type#34, gl_source_name#35, gl_source_system_name#36, year#37, period#38, scenario#39, version#40, currencies#41, business_unit#42, account#43, trading_partner#44, cost_center#45, geography#46, project#47, reference_code#48, product_line#49, ... 20 more fields]
+- Relation[forecast_id#26L,period_year#27,period_num#28,period_name#29,drm_org#30,ledger_id#31L,currency_code#32,source_system_name#33,source_record_type#34,gl_source_name#35,gl_source_system_name#36,year#37,period#38,scenario#39,version#40,currencies#41,business_unit#42,account#43,trading_partner#44,cost_center#45,geography#46,project#47,reference_code#48,product_line#49,... 19 more fields] JDBCRelation((select forecast_id,period_year,period_num,period_name,drm_org,ledger_id,currency_code,source_system_name,source_record_type,gl_source_name,gl_source_system_name,year,period,scenario,version,currencies,business_unit,account,trading_partner,cost_center,geography,project,reference_code,product_line,book_type,cc_region,cc_channel,cc_function,pl_market,ptd_balance,qtd_balance,ytd_balance,xx_hvr_last_upd_tms,xx_creation_tms,xx_last_update_tms,xx_last_update_log_id,xx_data_hash_code,xx_data_hash_id,xx_pk_id,null::integer as del_flag,ptd_balance::character varying as ptd_balance_text,qtd_balance::character varying as qtd_balance_text,ytd_balance::character varying as ytd_balance_text from analytics.xx_gl_forecast where period_year='2017') as year2017) [numPartitions=1]
Но если я передаю те же имена столбцов другим способом, как показано ниже, код работает нормально:
val lastCols = Seq("source_system_name","period_year")
val allColsOrdered = yearDF.columns.diff(lastCols) ++ lastCols
val allCols = allColsOrdered.map(coln => org.apache.spark.sql.functions.col(coln))
val resultDF = yearDF.select(allCols:_*)
Может кто-нибудь сказать мне, какую ошибку я здесь делаю?