Поток данных между SASL_SSL защищенным кластером Kafka и кластером Hadoop Kerberised с использованием spark_streaming - PullRequest
0 голосов
/ 25 января 2019

Я подключаюсь к Kafka Brokers, защищенным с помощью SSL, и опрашиваю каждые 60 секунд. затем, используя опрошенные данные, я создаю искровой фрейм данных, но когда я записываю его в таблицы кустов, он выдает ОШИБКУ, и приложение завершается.

Потребитель работает в кластере Kerberised Hadoop.

код потребителя использует следующий

jaas.config:

KafkaClient{
      org.apache.kafka.common.security.plain.PlainLoginModule required  
      username = "***" 
      password  = "***";
      };

build.sbt

scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % sparkVersion % "provided",
  "org.apache.spark" % "spark-sql_2.11" % sparkVersion % "provided",
  "org.apache.spark" % "spark-hive_2.11" % sparkVersion % "provided",
  "org.apache.spark" % "spark-streaming_2.11" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
  "org.apache.kafka" % "kafka-clients" % "0.11.0.2"
)

KafkaConsumer.scala

val spark = SparkSession.builder()
      .appName("SparkStreaming")
      .config("spark.driver.allowMultipleContexts", "true")
      .getOrCreate()

val sc = spark.sparkContext

val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "kafka-dev.xyz.com:9190",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "Scala_DEV",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "session.timeout.ms" -> "15000",
      "security.protocol" -> "SASL_SSL",
      "ssl.keystore.location" -> "ca-cert-sha1n2.jks",
      "ssl.keystore.password" -> "***",
      "ssl.truststore.password" -> "***",
      "ssl.truststore.location" -> "ca-cert-sha1n2.jks",
      "ssl.truststore.type" -> "JKS",
      "ssl.keystore.type" -> "JKS",
      "sasl.mechanism" -> "PLAIN",
      "ssl.key.password" -> ""
    )

val ssc = new StreamingContext(sc, Seconds(60))

val preferredHosts = LocationStrategies.PreferConsistent

val offsets = Map(new TopicPartition("Test_Topic", 0) -> 5L)

val dstream = KafkaUtils.createDirectStream[String, String](
      ssc,
      preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))

dstream.foreachRDD { rdd => {
        val json_df = spark.read.json(rdd.map(_.value()))
        json_df.show()
        json_df.write.insertInto("stage_load.Test_Table")
    }
}

ssc.start

ssc.awaitTermination()

когда я запускаю приведенный выше код с помощью приведенной ниже команды spark2-submit:

SPARK_KAFKA_VERSION=0.10 spark2-submit --master yarn --deploy-mode client --files jaas.conf,ca-cert-sha1n2.jks \ 
--driver-java-options "-Djava.security.auth.login.config=jaas.conf" \ 
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ 
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \ 
KafkaConsumer-assembly-0.1.jar

Я вижу, как заполняется информационный фрейм, но когда он записывается в таблицы кустов, он выдает следующие журналы искры:

19/01/23 06:10:12 WARN scheduler.TaskSetManager: Lost task 8.0 in stage 2.0 (TID 10, dojo3s10039.test.com, executor 1): java.lang.NullPointerException
        at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
        at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
        at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
        at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
        at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
        at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
        at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
        at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:749)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:680)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:144)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:104)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCommitter(FileOutputFormat.java:309)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupCommitter(HadoopMapReduceCommitProtocol.scala:100)
        at org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol.setupCommitter(SQLHadoopMapReduceCommitProtocol.scala:40)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupTask(HadoopMapReduceCommitProtocol.scala:217)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

19/01/23 06:10:12 INFO scheduler.TaskSetManager: Starting task 8.1 in stage 2.0 (TID 12, dojo3s10039.test.com, executor 1, partition 8, PROCESS_LOCAL, 26482 bytes)
19/01/23 06:10:12 INFO scheduler.TaskSetManager: Lost task 8.1 in stage 2.0 (TID 12) on dojo3s10039.test.com, executor 1: java.lang.NullPointerException (null) [duplicate 1]
19/01/23 06:10:12 INFO scheduler.TaskSetManager: Starting task 8.2 in stage 2.0 (TID 13, dojo3s10039.test.com, executor 1, partition 8, PROCESS_LOCAL, 26482 bytes)
19/01/23 06:10:13 INFO scheduler.TaskSetManager: Lost task 8.2 in stage 2.0 (TID 13) on dojo3s10039.test.com, executor 1: java.lang.NullPointerException (null) [duplicate 2]
19/01/23 06:10:13 INFO scheduler.TaskSetManager: Starting task 8.3 in stage 2.0 (TID 14, dojo3s10039.test.com, executor 1, partition 8, PROCESS_LOCAL, 26482 bytes)
19/01/23 06:10:13 INFO scheduler.TaskSetManager: Lost task 8.3 in stage 2.0 (TID 14) on dojo3s10039.test.com, executor 1: java.lang.NullPointerException (null) [duplicate 3]
19/01/23 06:10:13 ERROR scheduler.TaskSetManager: Task 8 in stage 2.0 failed 4 times; aborting job
19/01/23 06:10:13 INFO cluster.YarnScheduler: Cancelling stage 2
19/01/23 06:10:13 INFO cluster.YarnScheduler: Stage 2 was cancelled
19/01/23 06:10:13 INFO scheduler.DAGScheduler: ResultStage 2 (insertInto at KafkaConsumer.scala:162) failed in 1.303 s due to Job aborted due to stage failure: Task 8 in stage 2.0 failed 4 times, most recent failure: Lost task 8.3 in stage 2.0 (TID 14, dojo3s10039.test.com, executor 1): java.lang.NullPointerException
        at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
        at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
        at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
        at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
        at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
        at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
        at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
        at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:749)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:680)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:144)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:104)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCommitter(FileOutputFormat.java:309)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupCommitter(HadoopMapReduceCommitProtocol.scala:100)
        at org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol.setupCommitter(SQLHadoopMapReduceCommitProtocol.scala:40)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupTask(HadoopMapReduceCommitProtocol.scala:217)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
19/01/23 06:10:13 INFO scheduler.DAGScheduler: Job 2 failed: insertInto at KafkaConsumer.scala:162, took 1.310604 s
19/01/23 06:10:13 ERROR datasources.FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 2.0 failed 4 times, most recent failure: Lost task 8.3 in stage 2.0 (TID 14, dojo3s10039.test.com, executor 1): java.lang.NullPointerException
        at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
        at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
        at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
        at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
        at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
        at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
        at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
        at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:749)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:680)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:144)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:104)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCommitter(FileOutputFormat.java:309)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupCommitter(HadoopMapReduceCommitProtocol.scala:100)
        at org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol.setupCommitter(SQLHadoopMapReduceCommitProtocol.scala:40)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupTask(HadoopMapReduceCommitProtocol.scala:217)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
        at org.apache.spark.sql.hive.execution.SaveAsHiveFile$class.saveAsHiveFile(SaveAsHiveFile.scala:86)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:66)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:195)
        at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:99)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
        at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:322)
        at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:308)
        at com.test.spark.kafka.KafkaConsumer$$anonfun$main$1.apply(KafkaConsumer.scala:162)
        at com.test.spark.kafka.KafkaConsumer$$anonfun$main$1.apply(KafkaConsumer.scala:95)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163)
        at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381)
        at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186)
        at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
        at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68)
        at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66)
        at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58)
        at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:749)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:680)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:144)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:104)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputCommitter(FileOutputFormat.java:309)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupCommitter(HadoopMapReduceCommitProtocol.scala:100)
        at org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol.setupCommitter(SQLHadoopMapReduceCommitProtocol.scala:40)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupTask(HadoopMapReduceCommitProtocol.scala:217)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
        ... 3 more

Основываясь на моих наблюдениях, я вижу, что приложение успешно записывает фрейм данных в таблицы кустов время от времени и в большинстве случаев происходит сбой, за исключением указанного выше.

...