java.lang.ArrayIndexOutOfBoundsException при вставке данных в существующую таблицу разделов куста - PullRequest
0 голосов
/ 23 сентября 2019

Я пытаюсь вставить данные в существующую таблицу разделов, создав фрейм данных в PySpark. Я получаю ошибку ниже в Spark1 и Spark2:

ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 641)
java.lang.ArrayIndexOutOfBoundsException: 52
        at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericG                                                                             et(rows.scala:227)
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$clas                                                                             s.getAs(rows.scala:35)
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$clas                                                                             s.isNullAt(rows.scala:36)
        at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt                                                                             (rows.scala:221)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$                                                                             apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(Inser                                                                             tIntoHiveTable.scala:173)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$                                                                             apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(Inser                                                                             tIntoHiveTable.scala:170)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$sp                                                                             ark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sc                                                                             ala:170)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$save                                                                             AsHiveFile$3.apply(InsertIntoHiveTable.scala:150)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$save                                                                             AsHiveFile$3.apply(InsertIntoHiveTable.scala:150)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.                                                                             java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor                                                                             .java:624)
        at java.lang.Thread.run(Thread.java:748)

Ниже приведен мой код:

from pyspark.context import SparkContext
from pyspark import HiveContext, SQLContext
from pyspark import SparkConf
from pyspark.sql.functions import lit, when, col
from pyspark.sql.types import IntegerType, LongType
from pyspark.sql.window import Window

conf = SparkConf().setAppName('WApp')
sc = SparkContext(conf=conf)
hiveContext = HiveContext(sc)
sqlContext = SQLContext(sc)
hiveContext.setConf("hive.exec.dynamic.partition","true")
hiveContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")


def read_file(path):
    input_file = open(path, 'r')
    file_content = input_file.read()
    input_file.close()
    return file_content

query = read_file('/home/projects/select_query.sql')

df1 = hiveContext.sql(query)
df1.registerTempTable('df1_temp')


insert_statement1 = "Insert Into table pri_sp.org_financial_events partition(data_source_system, data_category, load_date, constant, posting_date) select id, event_enum, event_type, parent_id, src_ref_ext_id, event_date, created_ts, biz_tenant_id, sys_tenant_id, sender_receiver_type, account_number, acc_number_country_code, vendor_acquirer, presentment_currency_code, legal_entity, event_dr_cr_code, event_currency_code, tpv, transaction_count, event_amount, transaction_type, vendor_authorizer, inter_intra_company, gl_debit, gl_credit, status, source_system, event_key, good_bad_bucket_indicator, intra_cross_border_indicator, official_payment_indicator, original_transaction_id, payment_activity_id, product_id, reversal_status, transaction_reason, functional_amount, internal_external_account_indicator, sender_receiver_flag, extract_id, inverse_indicator, funding_instrument_country, event_time, process_step_id, is_ledger_impacting_y_n, is_dcc_transaction_y_n, base_id, data_source_system, data_category , load_date, constant, posting_date from df1_temp"

hiveContext.sql(insert_statement1)

Исходная и целевая таблицы в формате ORC.Исходная таблица имеет нулевые значения для некоторых строк, я не уверен, что это является причиной проблемы.Если да, как я могу обработать нулевые значения?

...