Я получаю сообщение об ошибке при отправке программы spark. Ниже приведена ошибка
client token: N/A
diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: Can not load class 'brickhouse.udf.collect.NumericRange' when registering the function 'numeric_range', please make sure it is on the classpath;
. Я использую нижеприведенную спарк-отправку и ссылаюсь на все используемые банки, но все равно получаю ошибку, связанную с банкой Brickhouse, при создании временной функции
/usr/hdp/2.6.5.3015-8/spark2/bin/spark-submit --master yarn --deploy-mode cluster --class "com.test.example.Sample" /home/sshuser/project1_2.11-0.1.0-SNAPSHOT.jar
--jars /usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar,/usr/hdp/2.6.5.3015-8/spark2/jars/spark-csv_2.10-1.5.0.jar,/usr/hdp/2.6.5.3015-8/spark2/jars/spark-xml_2.10-0.4.1.jar
--driver-class-path /usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar:/usr/hdp/2.6.5.3015-8/spark2/jars/spark-csv_2.10-1.5.0.jar:/usr/hdp/2.6.5.3015-8/spark2/jars/spark-xml_2.10-0.4.1.jar
--executor-class-path /usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar:/usr/hdp/2.6.5.3015-8/spark2/jars/spark-csv_2.10-1.5.0.jar:/usr/hdp/2.6.5.3015-8/spark2/jars/spark-xml_2.10-0.4.1.jar
Версия Spark 2.3. Я использую временную функцию для выполнения своего кода, ниже приведен фрагмент кода
package com.test.example;
import org.apache.spark.sql._
import com.databricks.spark.xml._;
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Sample{
def main(args:Array[String]){
val conf = new SparkConf().setAppName("XML Reading").setMaster("yarn-cluster").set("spark.driver.extraClassPath", "/usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar;/usr/hdp/2.6.5.3015-8/spark2/jars/spark-csv_2.10-1.5.0.jar;/usr/hdp/2.6.5.3015-8/spark2/jars/spark-xml_2.10-0.4.1.jar").set("spark.executor.extraClassPath", "/usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar;/usr/hdp/2.6.5.3015-8/spark2/jars/spark-csv_2.10-1.5.0.jar;/usr/hdp/2.6.5.3015-8/spark2/jars/spark-xml_2.10-0.4.1.jar")
val spark = SparkSession.builder().config(conf).getOrCreate()
spark.sql("ADD JAR /usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar")
spark.sql("""CREATE TEMPORARY FUNCTION numeric_range AS 'brickhouse.udf.collect.NumericRange' using JAR '/usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar'""")
spark.sql("""CREATE TEMPORARY FUNCTION array_index AS 'brickhouse.udf.collect.ArrayIndexUDF' using JAR '/usr/hdp/2.6.5.3015-8/spark2/jars/brickhouse-0.6.0.jar'""")
var df = spark.read.format("com.databricks.spark.xml").option("rowTag","DataArea").load("///scripts/Store_Site_edited.xml")
val schema = df.schema
var parse_df = spark.read.format("com.databricks.spark.xml").option("rowTag","DataArea").schema(schema).load("///scripts/Store_Site_edited.xml")
parse_df.registerTempTable("complete_df")
spark.sql(s"""select current_timestamp() as load_dttm,
actioncode,cast(regexp_replace(substr(creationTime,1,19),"T"," ") as timestamp) as start_dttm,n as array_index,array_index(Location,n) as Location from complete_df lateral view numeric_range(size(Location)) n1 as n""").registerTempTable("main_table_1")
val writeTo_6=spark.sql("""select cast(location.id._value as String) as siteid,
load_dttm,
location.id._schemeagencyid as site_schemeagencyid,
location.id._type as site_type,
location.name as site_name,
cast(location.coordinate.Latitude.DegreeMeasure._value as String) as latitude_degreemeasure,
location.Coordinate.Latitude.DegreeMeasure._unitCode as latitude_degreemeasure_unitcode,
cast(location.coordinate.Longitude.DegreeMeasure._value as String) as longitude_degreemeasure from main_table_1""").write.option("header","true").format("csv").mode("overwrite").save("///scripts/output6")
spark.stop
}
}
Не уверен, что мне не хватает, Любые предложения будут очень полезны