Мы пытаемся загрузить данные из внешней таблицы улья в управляемую таблицу, данные во внешней таблице подвергаются некоторому преобразованию перед заполнением в управляемой таблице, данные разбиваются по датам в таблицах и восходящие данные для внешняя таблица находится в формате txt, а данные находятся в паркете на управляемой таблице. На внешней и соответствующей управляемой таблице есть несколько числовых столбцов (десятичный, большой и маленький).
Сценарий spark Scala используется для получения данных из внешней таблицы в управляемую таблицу, сценарий:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, TimestampType, Decimal}
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.SparkConf
sqlContext.setConf("hive.exec.dynamic.partition","true")
sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
sqlContext.setConf("spark.sql.hive.convertMetastoreParquet","true")
val sconf = new SparkConf()
val paramsString = sconf.get("spark.driver.extraJavaOptions")
val paramsSlice = paramsString.slice(2,paramsString.length)
val paramsArray = paramsSlice.split(",")
val arg1 = paramsArray(0)
val arg2 = paramsArray(1)
val schema_mng_auto_services = StructType(Array(
StructField("PRCS_ID", StringType, true),
StructField("PRCS_INST_C", StringType, true),
StructField("PRCS_TS", TimestampType, true),
StructField("UV_D", StringType, true),
StructField("PLAN_N", StringType, true),
StructField("HCE_C", StringType, true),
StructField("DVSN_NM", StringType, true),
StructField("SSN_N", StringType, true),
StructField("SRC_C", IntegerType, true),
StructField("SRC_TYPE_X", StringType, true),
StructField("FUND_C", StringType, true),
StructField("REH_C", StringType, true),
StructField("REHIRE_I", StringType, true),
StructField("STAT_C", StringType, true),
StructField("HIRE_D", StringType, true),
StructField("NOTIFY_D", StringType, true),
StructField("DESC_X", StringType, true),
StructField("MIX_P", DataTypes.createDecimalType(20,2), true),
StructField("RES_C", StringType, true),
StructField("DFRL_BUCKET1_NM", StringType, true),
StructField("DFRL_BUCKET2_NM", StringType, true),
StructField("DFRL2_P", DataTypes.createDecimalType(20,2), true),
StructField("BASE_D", StringType, true),
StructField("CALC_D", StringType, true),
StructField("ELECT_D", StringType, true),
StructField("ENROL_D", StringType, true),
StructField("NEW_BASE_D", StringType, true),
StructField("CTRB_D", StringType, true),
StructField("SETUPIND_C", StringType, true),
StructField("DOH_C", StringType, true),
StructField("GRPID_C", StringType, true),
StructField("AE_MIX_USED_NM", StringType, true),
StructField("AE_SRC_NM", StringType, true),
StructField("AE_ENROLL_DAYS", StringType, true),
StructField("AE_ENROLL_D", StringType, true),
StructField("AE_INC_EXIST_C", StringType, true),
StructField("AE_INC_ELIGIB_C", StringType, true),
StructField("AE_LETTER_SNT_C", StringType, true),
StructField("AE_LITERAT_KIT_NM", StringType, true),
StructField("AE_REH_INC_C", StringType, true),
StructField("AE_REH_NOTIFY_DAYS", IntegerType, true),
StructField("AE_REH_ENROLL_DAYS", IntegerType, true),
StructField("AE_REH_SETUP_DAYS", IntegerType, true),
StructField("AE_SRC_C", IntegerType, true),
StructField("AE_INCR_D", StringType, true),
StructField("AE_INCR_A", DataTypes.createDecimalType(20,2), true),
StructField("AE_CAP_P", DataTypes.createDecimalType(20,2), true),
StructField("AE_EFF_D", StringType, true),
StructField("DFRL_P", DataTypes.createDecimalType(20,2), true),
StructField("BAL_I", StringType, true)
))
def convert_rule(filepath: String, schema: StructType, delim: String) {
println("before convert_rule "+filepath+"-"+schema)
val input_df_rule = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("delimiter",delim).option("nullValue","").option("treatEmptyValuesAsNulls","true").load(filepath+"*.gz").cache
input_df_rule.registerTempTable("stg_rule_temp")
sqlContext.sql("""insert into dcopr.mng_auto_services partition(dt)
select
trim(PRCS_ID),
trim(PRCS_INST_C),
PRCS_TS,
trim(UV_D),
trim(a.PLAN_N),
trim(HCE_C),
trim(DVSN_NM),
trim(SSN_N),
SRC_C,
trim(SRC_TYPE_X),
trim(FUND_C),
trim(REH_C),
trim(REHIRE_I),
trim(STAT_C),
trim(HIRE_D),
trim(NOTIFY_D),
trim(DESC_X),
MIX_P,
trim(RES_C),
trim(DFRL_BUCKET1_NM),
trim(DFRL_BUCKET2_NM),
DFRL2_P,
trim(BASE_D),
trim(CALC_D),
trim(ELECT_D),
trim(ENROL_D),
trim(NEW_BASE_D),
trim(CTRB_D),
trim(SETUPIND_C),
trim(DOH_C),
trim(GRPID_C),
trim(AE_MIX_USED_NM),
trim(AE_SRC_NM),
trim(AE_ENROLL_DAYS),
trim(AE_ENROLL_D),
trim(AE_INC_EXIST_C),
trim(AE_INC_ELIGIB_C),
trim(AE_LETTER_SNT_C),
trim(AE_LITERAT_KIT_NM),
trim(AE_REH_INC_C),
AE_REH_NOTIFY_DAYS,
AE_REH_ENROLL_DAYS,
AE_REH_SETUP_DAYS,
AE_SRC_C,
trim(AE_INCR_D),
AE_INCR_A,
AE_CAP_P,
trim(AE_EFF_D),
DFRL_P,
trim(BAL_I),
case when b.plan_n is null then 'N' else 'Y' end fmr_plan_ind,
substr(cast(prcs_ts as string),1,10) dt
FROM stg_rule_temp a left outer join dcopr.mng_fmr_plans b on trim(a.plan_n)=trim(b.plan_n) """)
println("after convert rule")
}
convert_rule(arg1,schema_mng_auto_services,arg2)
println("after fun call")
System.exit(0)
Ошибка ::
schema_mng_auto_services: org. apache .spark. sql .types.StructType = StructType (StructField (PRCS_ID, StringType, true), StructField (PRCS_INST_ C, StringType, true), StructField (PRCS_TS, TimestampType, true (true), StructField (String), StructField , верно), StructField (PLAN_N, StringType, true), StructField (HCE_ C, StringType, true), StructField (DVSN_NM, StringType, true), StructField (SSN_N, StringType, true), StructField (SRC_ * 10 IntegerType, true), StructField (SRC_TYPE_X, StringType, true), StructField (FUND_ C, StringType, true), StructField (REH_ C, StringType, true), StructField (REHIRE_I, StringType, true, StructType, true, StringType, true, C, StringType, true), StructField (HIRE_D, StringType, true), StructField (NOTIFY_D, StringType, true), StructField (DESC_X, StringType, true), StructField (MIX_P, DecimalType (20,2), true), StructField (RES_ C, StringType, правда) ... convert_rule: (filepath: String, схема: org. apache .spark. sql .types.StructType, delim: String) Единица измерения перед convert_rule / user / srvdcappp / staging / 2019-11-25 / 20191125_224625 / * AUTSV C -StructType (StructField (PRCS_ID, StringType, true), StructField (PRCS_INST_ C, StringType, true), StructField (PRCS_TS, TimestampType, true), StructField (UV_D, StringType, true, true) PLAN_N, StringType, true), StructField (HCE_ C, StringType, true), StructField (DVSN_NM, StringType, true), StructField (SSN_N, StringType, true), StructField (SRC_ C, IntegerType true, IntegerType true), (SRC_TYPE_X, StringType, true), StructField (FUND_ C, StringType, true), StructField (REH_ C, StringType, true), StructField (REHIRE_I, StringType, true), StructField (STAT_ C, String_ *1035*) true), StructField (HIRE_D, StringType, true), StructField (NOTIFY_D, StringType, true), StructField (DESC_X, StringType, true), StructField (MIX_P, DecimalType (20,2), true), StructField (RES_ C , StringType, true), StructField (DFRL_BUCKET1_NM, StringType, true), St ructField (DFRL_BUCKET2_NM, StringType, true), StructField (DFRL2_P, DecimalType (20,2), true), StructField (BASE_D, StringType, true), StructField (CALC_D, StringType, true, true), StructField (ELECTECT), ELECT StructField (ENROL_D, StringType, true), StructField (NEW_BASE_D, StringType, true), StructField (CTRB_D, StringType, true), StructField (SETUPIND_ C, StringType, true), StructField (DOH_ * 1038) , StructField (GRPID_ C, StringType, true), StructField (AE_MIX_USED_NM, StringType, true), StructField (AE_SRC_NM, StringType, true), StructField (AE_ENROLL_DAYS, StringTyF_TINEFINEFINEFINEFYTINEFINETINEFINE) TrueType, String (True), истинный тип (AE_INC_EXIST_ C, StringType, true), StructField (AE_INC_ELIGIB_ C, StringType, true), StructField (AE_LETTER_SNT_ C, StringType, true), StructField (AE_LT_TINE_REF) (AE_LT_TING_EINTING) , StringType, true), StructField (AE_REH_NOTIFY_DAYS, IntegerType, true), StructField (AE_REH_ENROLL_DAYS, IntegerType, true), StructField (AE_REH_SETUP_DAYS, Int egerType, true), StructField (AE_SRC_ C, IntegerType, true), StructField (AE_INCR_D, StringType, true), StructField (AE_INCR_A, DecimalType (20,2), true), StructField (AE_CAP_pe) 20 Dec, Dec , верно), StructField (AE_EFF_D, StringType, true), StructField (DFRL_P, DecimalType (20,2), true), StructField (BAL_I, StringType, true)) org. apache .spark.SparkException: задание прервано из-за Ошибка этапа: Задача 3 на этапе 1.0 не выполнена 4 раза, последний сбой: Потерянное задание 3.3 на этапе 1.0 (TID 37, dojo3s10101.fmr.com, исполнитель 10): java .lang. NumberFormatException: для входной строки: "N" в java .lang.NumberFormatException.forInputString (NumberFormatException. java: 65)
Я проверил входящие файлы данных на наличие любых строковых значений в числовых столбцах, и есть нет плохих данных. Используемая версия spark - 1.6.0, версия Hive - 1.1.0-cdh5.16.2, а версия Had oop - 2.6.0-cdh5.16.2 с дистрибутивом cloudera.
Это проблема совместимости версий Spark, Hive и Had oop?
Спасибо