Как исправить org.apache.spark.sql.AnalysisException при изменении порядка столбцов в кадре данных? - PullRequest
0 голосов
/ 29 августа 2018

Я пытаюсь загрузить данные из таблицы RDBMS в таблице Postgres to Hive в HDFS.

      val yearDF = spark.read.format("jdbc").option("url", connectionUrl)
                        .option("dbtable", s"(${query}) as year2017")
                        .option("user", devUserName).option("password", devPassword)
                        .option("numPartitions",15).load()

Таблица Hive динамически разделена на две колонки: source_system_name,period_year У меня есть имена столбцов в таблице метаданных: metatables

val spColsDF = spark.read.format("jdbc").option("url",hiveMetaConURL)
                    .option("dbtable", "(select partition_columns from metainfo.metatables where tablename='finance.xx_gl_forecast') as colsPrecision")
                    .option("user", metaUserName)
                    .option("password", metaPassword)
                    .load()

Я пытаюсь переместить столбцы раздела: source_system_name, period_year в конец dataFrame: yearDF, поскольку столбцы, используемые в динамическом разделении Hive, должны находиться в конце. Для этого я придумал следующую логику:

val partition_columns      = spColsDF.select("partition_columns").collect().map(_.getString(0)).toSeq
val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
val allCols                = allColsOrdered.map(coln => org.apache.spark.sql.functions.col(coln))
val resultDF               = yearDF.select(allCols:_*)

Когда я выполняю код, я получаю исключение: org.apache.spark.sql.AnalysisException, как показано ниже:

Exception in thread "main" 18/08/28 18:09:30 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
org.apache.spark.sql.AnalysisException: cannot resolve '`source_system_name,period_year`' given input columns: [cost_center, period_num, period_name, currencies, cc_channel, scenario, xx_pk_id, period_year, cc_region, reference_code, source_system_name, source_record_type, xx_last_update_tms, xx_last_update_log_id, book_type, cc_function, product_line, ptd_balance_text, project, ledger_id, currency_code, xx_data_hash_id, qtd_balance_text, pl_market, version, qtd_balance, period, ptd_balance, ytd_balance_text, xx_hvr_last_upd_tms, geography, year, del_flag, trading_partner, ytd_balance, xx_data_hash_code, xx_creation_tms, forecast_id, drm_org, account, business_unit, gl_source_name, gl_source_system_name];;
'Project [forecast_id#26L, period_year#27, period_num#28, period_name#29, drm_org#30, ledger_id#31L, currency_code#32, source_system_name#33, source_record_type#34, gl_source_name#35, gl_source_system_name#36, year#37, period#38, scenario#39, version#40, currencies#41, business_unit#42, account#43, trading_partner#44, cost_center#45, geography#46, project#47, reference_code#48, product_line#49, ... 20 more fields]
+- Relation[forecast_id#26L,period_year#27,period_num#28,period_name#29,drm_org#30,ledger_id#31L,currency_code#32,source_system_name#33,source_record_type#34,gl_source_name#35,gl_source_system_name#36,year#37,period#38,scenario#39,version#40,currencies#41,business_unit#42,account#43,trading_partner#44,cost_center#45,geography#46,project#47,reference_code#48,product_line#49,... 19 more fields] JDBCRelation((select forecast_id,period_year,period_num,period_name,drm_org,ledger_id,currency_code,source_system_name,source_record_type,gl_source_name,gl_source_system_name,year,period,scenario,version,currencies,business_unit,account,trading_partner,cost_center,geography,project,reference_code,product_line,book_type,cc_region,cc_channel,cc_function,pl_market,ptd_balance,qtd_balance,ytd_balance,xx_hvr_last_upd_tms,xx_creation_tms,xx_last_update_tms,xx_last_update_log_id,xx_data_hash_code,xx_data_hash_id,xx_pk_id,null::integer as del_flag,ptd_balance::character varying as ptd_balance_text,qtd_balance::character varying as qtd_balance_text,ytd_balance::character varying as ytd_balance_text from analytics.xx_gl_forecast where period_year='2017') as year2017) [numPartitions=1]

Но если я передаю те же имена столбцов другим способом, как показано ниже, код работает нормально:

val lastCols        = Seq("source_system_name","period_year")
val allColsOrdered  = yearDF.columns.diff(lastCols) ++ lastCols
val allCols         = allColsOrdered.map(coln => org.apache.spark.sql.functions.col(coln))
val resultDF        = yearDF.select(allCols:_*)

Может кто-нибудь сказать мне, какую ошибку я здесь делаю?

1 Ответ

0 голосов
/ 29 августа 2018

Если вы посмотрите на ошибку:

cannot resolve '`source_system_name,period_year`

Это означает, что следующая строка:

spColsDF.select("partition_columns").collect().map(_.getString(0)).toSeq

возвращает что-то вроде:

Array("source_system_name,period_year")

это означает, что оба имени столбца объединены и образуют первый элемент массива вместо отдельных элементов, как вы хотите.

Чтобы получить желаемый результат, вам нужно разделить его на ,. Например, следующее должно работать.

spColsDf.select("partition_columns").collect.flatMap(_.getAs[String](0).split(","))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...