Я пытаюсь вставить данные в существующую таблицу разделов, создав фрейм данных в PySpark. Я получаю ошибку ниже в Spark1 и Spark2:
ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 641)
java.lang.ArrayIndexOutOfBoundsException: 52
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericG et(rows.scala:227)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$clas s.getAs(rows.scala:35)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$clas s.isNullAt(rows.scala:36)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt (rows.scala:221)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$ apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(Inser tIntoHiveTable.scala:173)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$ apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(Inser tIntoHiveTable.scala:170)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$sp ark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sc ala:170)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$save AsHiveFile$3.apply(InsertIntoHiveTable.scala:150)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$save AsHiveFile$3.apply(InsertIntoHiveTable.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:624)
at java.lang.Thread.run(Thread.java:748)
Ниже приведен мой код:
from pyspark.context import SparkContext
from pyspark import HiveContext, SQLContext
from pyspark import SparkConf
from pyspark.sql.functions import lit, when, col
from pyspark.sql.types import IntegerType, LongType
from pyspark.sql.window import Window
conf = SparkConf().setAppName('WApp')
sc = SparkContext(conf=conf)
hiveContext = HiveContext(sc)
sqlContext = SQLContext(sc)
hiveContext.setConf("hive.exec.dynamic.partition","true")
hiveContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
def read_file(path):
input_file = open(path, 'r')
file_content = input_file.read()
input_file.close()
return file_content
query = read_file('/home/projects/select_query.sql')
df1 = hiveContext.sql(query)
df1.registerTempTable('df1_temp')
insert_statement1 = "Insert Into table pri_sp.org_financial_events partition(data_source_system, data_category, load_date, constant, posting_date) select id, event_enum, event_type, parent_id, src_ref_ext_id, event_date, created_ts, biz_tenant_id, sys_tenant_id, sender_receiver_type, account_number, acc_number_country_code, vendor_acquirer, presentment_currency_code, legal_entity, event_dr_cr_code, event_currency_code, tpv, transaction_count, event_amount, transaction_type, vendor_authorizer, inter_intra_company, gl_debit, gl_credit, status, source_system, event_key, good_bad_bucket_indicator, intra_cross_border_indicator, official_payment_indicator, original_transaction_id, payment_activity_id, product_id, reversal_status, transaction_reason, functional_amount, internal_external_account_indicator, sender_receiver_flag, extract_id, inverse_indicator, funding_instrument_country, event_time, process_step_id, is_ledger_impacting_y_n, is_dcc_transaction_y_n, base_id, data_source_system, data_category , load_date, constant, posting_date from df1_temp"
hiveContext.sql(insert_statement1)
Исходная и целевая таблицы в формате ORC.Исходная таблица имеет нулевые значения для некоторых строк, я не уверен, что это является причиной проблемы.Если да, как я могу обработать нулевые значения?