Не удалось построить потребитель Kafka с хранилищем ключей в S3 - PullRequest
0 голосов
/ 09 октября 2019

У меня есть задание Spark, которое использует данные из защищенной темы Кафки. Это работает, когда truststore.jks физически присутствует там, где выполняется задание. Однако, если я указываю на мое S3-ведро для Spark, чтобы получить файл JKS, это не получается.

Вот как выглядит моя работа:

val kafkaReadStream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1")
    .option("subscribe", "my_topic")
    .option("kafka.ssl.truststore.location", "s3a://a-bucket/a-directory/truststore.jks"
    ...

Чтобы Spark мог связаться с моим AWSЯ настроил spark-shell, как указано ниже:

./spark-2.4.4-bin-hadoop2.7/bin/spark-shell \
    --packages org.apache.hadoop:hadoop-aws:2.7.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.hadoop.fs.s3a.access.key=$AWS_ACCESS_KEY \
    --conf spark.hadoop.fs.s3a.secret.key=$AWS_SECRET_KEY

Это ошибка, которую я получаю:

org.apache.spark.SparkException: Exception thrown in awaitResult:
  ...
  ... 57 elided
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:596)
  at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
  at org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:86)
  at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchTopicPartitions$1.apply(KafkaOffsetReader.scala:119)
  at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchTopicPartitions$1.apply(KafkaOffsetReader.scala:116)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anon$2$$anon$1.run(KafkaOffsetReader.scala:59)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore s3a://a-bucket/a-directory/truststore.jks of type JKS
  at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:153)
  at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
  at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
  at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:713)
  ... 11 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore s3a://a-bucket/a-directory/truststore.jks of type JKS
  at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:137)
  at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
  ... 15 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore s3a://a-bucket/a-directory/truststore.jks of type JKS
  at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:330)
  at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:226)
  at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:135)
  ... 16 more
Caused by: java.io.FileNotFoundException: s3a:/adlearner-featurestore-us-west-2/kafka-configs/truststore.jks (No such file or directory)
  at java.io.FileInputStream.open0(Native Method)

Я точно знаю, что мои настройки на S3 верны, так какЯ могу прочитать текстовый файл в том же каталоге, что и этот JKS-файл, запустив его в оболочке:

spark.read.textFile("s3a://a-bucket/a-directory/test.txt").show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...