Я разобрался в своей проблеме.При указании параметра протокола безопасности перед именем параметра должен стоять префикс «kafka».Это сбивает с толку, потому что для обычного потребителя Kafka опция просто security.protocol , но для целей настройки Spark bootstrap.servers и security.protocol (и предположительно любые другие параметры / свойства, которые вам могут понадобиться) должны иметь префикс kafka. .Мой оригинальный код был:
.option("security.protocol", "SASL_PLAINTEXT")
правильный вариант:
.option("kafka.security.protocol", "SASL_PLAINTEXT")
Вот полный код, который работает:
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
object Main {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.INFO)
Logger.getLogger("akka").setLevel(Level.INFO)
val spark = SparkSession.builder()
.master("local[*]")
.appName("myapp")
.config("spark.executor.extraJavaOptions", "java.security.auth.login.config=c:/krb/jaas.conf")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9100,broker2:9100")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("subscribe", "mytopic")
.load()
val query = lines.select("value").writeStream.format("console").start()
query.awaitTermination()
}
}
Для справки, вот содержаниефайла jaas.conf:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="c:/krb/mykeytab.keytab"
principal="myaccount@mydomain.int"
storeKey=true
useTicketCache=false
serviceName="myservicename";
};