Я пытаюсь читать из Kafka и писать в Kudu Sink, используя Spark Structured Streaming. Ниже приведен код для чтения и записи.
Я использую Spark 2.2.0.
val kafkaDataFrame = spark
.readStream
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("kafka.bootstrap.servers", kafkaReaderConfig.kafka_brokers)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism" , "GSSAPI")
.option("kafka.ssl.truststore.location", kafkaReaderConfig.trust_jks_file_path)
.option("kafka.sasl.jaas.config", jaas_config_str)
.option("subscribe", kafkaReaderConfig.topics_set)
.load()
.selectExpr("CAST(value AS STRING) as value")
//After Transformation
dfStrm.writeStream
.option("checkpointLocation",path)
.trigger(Trigger.ProcessingTime("10 seconds"))
.foreach(new KuduStreamWriter(tconfig))
.outputMode("append")
.start()
.awaitTermination()
})
Но получаются следующие исключения:
20/05/07 10:59:00 INFO authenticator.AbstractLogin: Successfully logged in.
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT refresh thread started.
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT valid starting at: Thu May 07 10:58:17 UTC 2020
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT expires: Thu May 07 20:58:16 UTC 2020
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT refresh sleeping until: Thu May 07 19:25:22 UTC 2020
20/05/07 10:59:00 ERROR streaming.StreamExecution: Query [id = 6d08d948-6c28-4282-b108-eac99c62e253, runId = 94d599d9-b7a1-4cdc-937f-8d98390fb509] terminated with error
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
.
.
.
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: SSL trust store is specified, but trust store password is not specified.
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
... 370 more
Caused by: org.apache.kafka.common.KafkaException: SSL trust store is specified, but trust store password is not specified.
at org.apache.kafka.common.security.ssl.SslFactory.createTruststore(SslFactory.java:195)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:115)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:83)
... 373 more
Q.1 Есть ли способ пропустить свойство truststore.password при чтении из Kafka? (У нас нет пароля, и нам не нужен пароль для работы с Kafka)
Есть предложения или обходные пути?