У меня есть задание Spark, которое использует данные из защищенной темы Кафки. Это работает, когда truststore.jks
физически присутствует там, где выполняется задание. Однако, если я указываю на мое S3-ведро для Spark, чтобы получить файл JKS, это не получается.
Вот как выглядит моя работа:
val kafkaReadStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("subscribe", "my_topic")
.option("kafka.ssl.truststore.location", "s3a://a-bucket/a-directory/truststore.jks"
...
Чтобы Spark мог связаться с моим AWSЯ настроил spark-shell
, как указано ниже:
./spark-2.4.4-bin-hadoop2.7/bin/spark-shell \
--packages org.apache.hadoop:hadoop-aws:2.7.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.access.key=$AWS_ACCESS_KEY \
--conf spark.hadoop.fs.s3a.secret.key=$AWS_SECRET_KEY
Это ошибка, которую я получаю:
org.apache.spark.SparkException: Exception thrown in awaitResult:
...
... 57 elided
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:615)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:596)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:86)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchTopicPartitions$1.apply(KafkaOffsetReader.scala:119)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchTopicPartitions$1.apply(KafkaOffsetReader.scala:116)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anon$2$$anon$1.run(KafkaOffsetReader.scala:59)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore s3a://a-bucket/a-directory/truststore.jks of type JKS
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:153)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:713)
... 11 more
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore s3a://a-bucket/a-directory/truststore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:137)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
... 15 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore s3a://a-bucket/a-directory/truststore.jks of type JKS
at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:330)
at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:226)
at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:135)
... 16 more
Caused by: java.io.FileNotFoundException: s3a:/adlearner-featurestore-us-west-2/kafka-configs/truststore.jks (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
Я точно знаю, что мои настройки на S3 верны, так какЯ могу прочитать текстовый файл в том же каталоге, что и этот JKS-файл, запустив его в оболочке:
spark.read.textFile("s3a://a-bucket/a-directory/test.txt").show()