Я пытаюсь использовать защищенную тему Кафки (используя SASL_PLAINTEXT, метод ScramLogin).
Spark Версия 2.3.1
Scala 2.11
Кафка последний
Я использую структурированный поток Spark для его создания. Для этого я импортировал библиотеку: spark-sql-kafka-0-10_2.11-2.3.1
Импортирует старую версию (0.10.0.1) файла kafka-clients.jar.
Ниже мой client.jaas:
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
mechanism=SCRAM-SHA-512
security.protocol=SASL_PLAINTEXT
client.id="*****"
username="****"
password="*****";
};
Я использую ScramLoginModule, и указанный выше jar-файл kafka-client даже не имеет этого jar-файла. Итак, я добавил более позднюю версию jar-файла kafka Clients kafka-clients-1.1.1-cp1.jar
В моей записной книжке у меня есть следующий код:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
val kafkaSource = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1)
.option("kafka.sasl.mechanism","SCRAM-SHA-512")
.option("kafka.security.protocol","SASL_PLAINTEXT")
.load()
val kafkaStream = kafkaSource.
select(
$"key" cast "string", // deserialize keys
$"value" cast "string", // deserialize values
$"topic",
$"partition",
$"offset",
$"timestamp",
$"timestampType")
import org.apache.spark.sql.ForeachWriter
kafkaStream.writeStream.foreach(new ForeachWriter[Row] {
override def process(row: Row): Unit = {
println("Processing : " + row.mkString(","))
}
override def close(errorOrNull: Throwable): Unit = {}
override def open(partitionId: Long, version: Long): Boolean = {
true
}
}).start().awaitTermination()
Из журналов драйверов видно, что я могу нормально подключиться.
18/09/05 13:09:46 INFO AbstractLogin: Successfully logged in.
18/09/05 13:09:46 INFO ConsumerConfig: ConsumerConfig values:
Но после этого ничего не происходит !! Он просто застревает после печати следующих строк:
18/09/05 13:09:47 INFO AppInfoParser: Kafka version : 0.10.0.1
18/09/05 13:09:47 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
Приведенная выше версия Kafka также очень сбивает с толку, так как я нигде не использую эту версию (хотя это была версия клиента kafka, которая автоматически импортируется при добавлении spark-sql-kafka-0-10_2.11-2.3 .1 библиотека. Но я решил исключить jar-файл kafka-client при выполнении этого импорта). Клиенты kafka, которые я использую, как я уже сказал, имеют более высокую версию, как и кластер kafka, к которому я подключаюсь.
Чтобы повторить это, я запустил тот же код на кластере искр, работающем на моем ноутбуке, и все работает нормально, где я могу использовать и распечатывать сообщения kafka.
Всем, кто сталкивался с подобной проблемой, пожалуйста, сообщите !!