Я подключаюсь к Kafka Brokers, защищенным с помощью SSL, и опрашиваю каждые 60 секунд. затем, используя опрошенные данные, я создаю искровой фрейм данных, но когда я записываю его в таблицы кустов, он выдает ОШИБКУ, и приложение завершается.
Потребитель работает в кластере Kerberised Hadoop.
код потребителя использует следующий
jaas.config:
KafkaClient{
org.apache.kafka.common.security.plain.PlainLoginModule required
username = "***"
password = "***";
};
build.sbt
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % sparkVersion % "provided",
"org.apache.spark" % "spark-sql_2.11" % sparkVersion % "provided",
"org.apache.spark" % "spark-hive_2.11" % sparkVersion % "provided",
"org.apache.spark" % "spark-streaming_2.11" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
"org.apache.kafka" % "kafka-clients" % "0.11.0.2"
)
KafkaConsumer.scala
val spark = SparkSession.builder()
.appName("SparkStreaming")
.config("spark.driver.allowMultipleContexts", "true")
.getOrCreate()
val sc = spark.sparkContext
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "kafka-dev.xyz.com:9190",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "Scala_DEV",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"session.timeout.ms" -> "15000",
"security.protocol" -> "SASL_SSL",
"ssl.keystore.location" -> "ca-cert-sha1n2.jks",
"ssl.keystore.password" -> "***",
"ssl.truststore.password" -> "***",
"ssl.truststore.location" -> "ca-cert-sha1n2.jks",
"ssl.truststore.type" -> "JKS",
"ssl.keystore.type" -> "JKS",
"sasl.mechanism" -> "PLAIN",
"ssl.key.password" -> ""
)
val ssc = new StreamingContext(sc, Seconds(60))
val preferredHosts = LocationStrategies.PreferConsistent
val offsets = Map(new TopicPartition("Test_Topic", 0) -> 5L)
val dstream = KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))
dstream.foreachRDD { rdd => {
val json_df = spark.read.json(rdd.map(_.value()))
json_df.show()
json_df.write.insertInto("stage_load.Test_Table")
}
}
ssc.start
ssc.awaitTermination()
когда я запускаю приведенный выше код с помощью приведенной ниже команды spark2-submit:
SPARK_KAFKA_VERSION=0.10 spark2-submit --master yarn --deploy-mode client --files jaas.conf,ca-cert-sha1n2.jks \
--driver-java-options "-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
KafkaConsumer-assembly-0.1.jar
Я вижу, как заполняется информационный фрейм, но когда он записывается в таблицы кустов, он выдает следующие журналы искры:
19/01/23 06:10:12 WARN scheduler.TaskSetManager: Lost task 8.0 in stage 2.0 (TID 10, dojo3s10039.test.com, executor 1): java.lang.NullPointerException
at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:749)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:680)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:144)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:104)
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCommitter(FileOutputFormat.java:309)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupCommitter(HadoopMapReduceCommitProtocol.scala:100)
at org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol.setupCommitter(SQLHadoopMapReduceCommitProtocol.scala:40)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupTask(HadoopMapReduceCommitProtocol.scala:217)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
19/01/23 06:10:12 INFO scheduler.TaskSetManager: Starting task 8.1 in stage 2.0 (TID 12, dojo3s10039.test.com, executor 1, partition 8, PROCESS_LOCAL, 26482 bytes)
19/01/23 06:10:12 INFO scheduler.TaskSetManager: Lost task 8.1 in stage 2.0 (TID 12) on dojo3s10039.test.com, executor 1: java.lang.NullPointerException (null) [duplicate 1]
19/01/23 06:10:12 INFO scheduler.TaskSetManager: Starting task 8.2 in stage 2.0 (TID 13, dojo3s10039.test.com, executor 1, partition 8, PROCESS_LOCAL, 26482 bytes)
19/01/23 06:10:13 INFO scheduler.TaskSetManager: Lost task 8.2 in stage 2.0 (TID 13) on dojo3s10039.test.com, executor 1: java.lang.NullPointerException (null) [duplicate 2]
19/01/23 06:10:13 INFO scheduler.TaskSetManager: Starting task 8.3 in stage 2.0 (TID 14, dojo3s10039.test.com, executor 1, partition 8, PROCESS_LOCAL, 26482 bytes)
19/01/23 06:10:13 INFO scheduler.TaskSetManager: Lost task 8.3 in stage 2.0 (TID 14) on dojo3s10039.test.com, executor 1: java.lang.NullPointerException (null) [duplicate 3]
19/01/23 06:10:13 ERROR scheduler.TaskSetManager: Task 8 in stage 2.0 failed 4 times; aborting job
19/01/23 06:10:13 INFO cluster.YarnScheduler: Cancelling stage 2
19/01/23 06:10:13 INFO cluster.YarnScheduler: Stage 2 was cancelled
19/01/23 06:10:13 INFO scheduler.DAGScheduler: ResultStage 2 (insertInto at KafkaConsumer.scala:162) failed in 1.303 s due to Job aborted due to stage failure: Task 8 in stage 2.0 failed 4 times, most recent failure: Lost task 8.3 in stage 2.0 (TID 14, dojo3s10039.test.com, executor 1): java.lang.NullPointerException
at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:749)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:680)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:144)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:104)
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCommitter(FileOutputFormat.java:309)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupCommitter(HadoopMapReduceCommitProtocol.scala:100)
at org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol.setupCommitter(SQLHadoopMapReduceCommitProtocol.scala:40)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupTask(HadoopMapReduceCommitProtocol.scala:217)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
19/01/23 06:10:13 INFO scheduler.DAGScheduler: Job 2 failed: insertInto at KafkaConsumer.scala:162, took 1.310604 s
19/01/23 06:10:13 ERROR datasources.FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 2.0 failed 4 times, most recent failure: Lost task 8.3 in stage 2.0 (TID 14, dojo3s10039.test.com, executor 1): java.lang.NullPointerException
at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:749)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:680)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:144)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:104)
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCommitter(FileOutputFormat.java:309)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupCommitter(HadoopMapReduceCommitProtocol.scala:100)
at org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol.setupCommitter(SQLHadoopMapReduceCommitProtocol.scala:40)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupTask(HadoopMapReduceCommitProtocol.scala:217)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
at org.apache.spark.sql.hive.execution.SaveAsHiveFile$class.saveAsHiveFile(SaveAsHiveFile.scala:86)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:66)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:195)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:322)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:308)
at com.test.spark.kafka.KafkaConsumer$$anonfun$main$1.apply(KafkaConsumer.scala:162)
at com.test.spark.kafka.KafkaConsumer$$anonfun$main$1.apply(KafkaConsumer.scala:95)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:749)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:680)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:144)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:104)
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCommitter(FileOutputFormat.java:309)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupCommitter(HadoopMapReduceCommitProtocol.scala:100)
at org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol.setupCommitter(SQLHadoopMapReduceCommitProtocol.scala:40)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupTask(HadoopMapReduceCommitProtocol.scala:217)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
... 3 more
Основываясь на моих наблюдениях, я вижу, что приложение успешно записывает фрейм данных в таблицы кустов время от времени и в большинстве случаев происходит сбой, за исключением указанного выше.