Как я могу пропустить свойство ssl.truststore.password при чтении из Kafka с использованием структурированной потоковой передачи Spark? - PullRequest
0 голосов
/ 07 мая 2020

Я пытаюсь читать из Kafka и писать в Kudu Sink, используя Spark Structured Streaming. Ниже приведен код для чтения и записи.

Я использую Spark 2.2.0.

val kafkaDataFrame = spark
  .readStream
  .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
  .option("kafka.bootstrap.servers", kafkaReaderConfig.kafka_brokers)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism" , "GSSAPI")
  .option("kafka.ssl.truststore.location", kafkaReaderConfig.trust_jks_file_path)
  .option("kafka.sasl.jaas.config", jaas_config_str)
  .option("subscribe", kafkaReaderConfig.topics_set)
  .load()
  .selectExpr("CAST(value AS STRING) as value")

//After Transformation

  dfStrm.writeStream
    .option("checkpointLocation",path)
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .foreach(new KuduStreamWriter(tconfig))
    .outputMode("append")
    .start()
    .awaitTermination()
})  

Но получаются следующие исключения:

20/05/07 10:59:00 INFO authenticator.AbstractLogin: Successfully logged in.
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT refresh thread started.
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT valid starting at: Thu May 07 10:58:17 UTC 2020
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT expires: Thu May 07 20:58:16 UTC 2020
20/05/07 10:59:00 INFO kerberos.KerberosLogin: TGT refresh sleeping until: Thu May 07 19:25:22 UTC 2020
20/05/07 10:59:00 ERROR streaming.StreamExecution: Query [id = 6d08d948-6c28-4282-b108-eac99c62e253, runId = 94d599d9-b7a1-4cdc-937f-8d98390fb509] terminated with error
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
        at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
        at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
        at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
        at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at 
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        .
        .
        .
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
        at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: SSL trust store is specified, but trust store password is not specified.
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
        ... 370 more
Caused by: org.apache.kafka.common.KafkaException: SSL trust store is specified, but trust store password is not specified.
        at org.apache.kafka.common.security.ssl.SslFactory.createTruststore(SslFactory.java:195)
        at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:115)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:83)
        ... 373 more

Q.1 Есть ли способ пропустить свойство truststore.password при чтении из Kafka? (У нас нет пароля, и нам не нужен пароль для работы с Kafka)

Есть предложения или обходные пути?

1 Ответ

0 голосов
/ 07 мая 2020

Прежде всего я рекомендую вам обновить версию до 2.4.5

Если вам не нужен SSL, не устанавливайте его.

val kafkaDataFrame = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaReaderConfig.kafka_brokers)
  .option("subscribe", kafkaReaderConfig.topics_set)
  .load()
  .selectExpr("CAST(value AS STRING) as value")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...