Ошибка в коде pyspark вне связанной исключительной ситуации - PullRequest
0 голосов
/ 06 апреля 2020
%livy2.pyspark
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("dev_fms_con_ap")
data = hive.executeQuery("select * from tds_test")

df_test = data.filter(data.cpudt_cpudt >=(20191201))
df_train = data.filter(data.cpudt_cpudt <(20191201))

from pyspark.sql.functions import *
from pyspark.sql import functions as f
df_train_taxrate = df_train.groupby('Company_code_BUKRS','Vendor_Customer_Code_WT_ACCO','Expense_GL_HKONT','PAN_J_1IPANNO','HSN_SAC_HSN_SAC').agg(f.collect_set('Section_WT_QSCOD').alias('Unique Sectio Code'),f.collect_set('WHT_rate_QSATZ').alias('Unique W/tax rate'))
df_train_taxrate = df_train_taxrate.withColumnRenamed('Company_code_BUKRS','company_code_bukrs').withColumnRenamed('Vendor_Customer_Code_WT_ACCO','vendor_customer_code_wt_acco').withColumnRenamed('Expense_GL_HKONT','expense_gl_hkont').withColumnRenamed('PAN_J_1IPANNO','pan_j_1ipanno').withColumnRenamed('HSN_SAC_HSN_SAC','hsn_sac_hsn_sac')

from pyspark.sql.functions import lit
df_test = df_test.withColumn('section_wt_qscod',  array(lit(df_test['section_wt_qscod'])))
df_test = df_test.withColumn('wht_rate_qsatz',array(lit(df_test['wht_rate_qsatz'])))

df_common_check = df_test.join(df_train_taxrate,on=['company_code_bukrs','vendor_customer_code_wt_acco','expense_gl_hkont','pan_j_1ipanno','hsn_sac_hsn_sac'],how='left')

from pyspark.sql import Row
from pyspark.sql.functions import udf, size
from pyspark.sql.types import *
import pyspark.sql.functions as F

intersect = lambda type: (udf(lambda x, y: (list(set(x) & set(y)) if x is not None and y is not None else None),ArrayType(type)))
string_intersect = intersect(StringType())
df3 = df_common_check.filter(df_common_check['Unique Sectio Code'].isNotNull())
section_anomaly = df3.where(size(string_intersect("section_wt_qscod", "Unique Sectio Code")) <= 0)

#drop_lst = ['Unique Sectio Code', 'Unique W/tax rate']
section_anomaly = section_anomaly.withColumn("anomalous_flag",lit("Anomaly_Section code")) 

section_anomaly = section_anomaly.withColumn('ml_remarks',F.when(F.col('Unique Sectio Code').isNotNull(),"Previously used Section code : {}" .format(F.col('Unique Sectio Code'))))
section_anomaly = section_anomaly.drop('Unique Sectio Code', 'Unique W/tax rate')


from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, size
from pyspark.sql.types import *
from pyspark.sql.functions import split

intersect = lambda type: (udf(lambda x, y: (list(set(x) & set(y)) if x is not None and y is not None else None),ArrayType(type)))
string_intersect = intersect(StringType())
df3 = df_common_check.filter(df_common_check['Unique W/tax rate'].isNotNull())
tax_anomaly = df3.where(size(string_intersect("wht_rate_qsatz", "Unique W/tax rate")) <= 0)
tax_anomaly = tax_anomaly.withColumn("anomalous_flag",lit("Anomaly_Tax rate"))
tax_anomaly = tax_anomaly.withColumn('ml_remarks',F.when(F.col('Unique W/tax rate').isNotNull(),"Previously used Tax rate : {}" .format(F.col('Unique W/tax rate'))))
tax_anomaly = tax_anomaly.drop('Unique Sectio Code', 'Unique W/tax rate')
ldc = df_test.where(df_test.exemption_number_wt_wtexmn != '')
ldc = ldc.withColumn("anomalous_flag",lit("LDC")).withColumn("ml_remarks",lit("LDC Number exists"))

df_new_entry = df_test.join(df_train_taxrate,["company_code_bukrs",'vendor_customer_code_wt_acco','expense_gl_hkont','pan_j_1ipanno','hsn_sac_hsn_sac'],how='left_anti')
df_new_entry = df_new_entry.withColumn("anomalous_flag",lit("New Entry")).withColumn("ml_remarks",lit("NO combination found"))


from functools import reduce
from pyspark.sql import DataFrame

dfs = [section_anomaly,tax_anomaly,ldc,df_new_entry]
df_final_anomaly = reduce(DataFrame.unionByName, dfs)

df_final_anomaly.show()

во время работы df_final_anomaly мы получаем ошибку ниже.

An error occurred while calling o587.showString. : 
org.apache.spark.SparkException: 
Job aborted due to stage failure: Task 3 in stage 85.0 failed 4 times, most recent failure: Lost task 3.3 in stage 85.0 (TID 182, sidcpdatanode07.ril.com, executor 109): 
java.lang.ArrayIndexOutOfBoundsException: 
6 at 
org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:98) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.datasourcev2scan_nextBatch_0$(Unknown Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at

...