Поглотить Secure Kafka от блока свечей данных - PullRequest
0 голосов
/ 05 сентября 2018

Я пытаюсь использовать защищенную тему Кафки (используя SASL_PLAINTEXT, метод ScramLogin).

Spark Версия 2.3.1 Scala 2.11 Кафка последний

Я использую структурированный поток Spark для его создания. Для этого я импортировал библиотеку: spark-sql-kafka-0-10_2.11-2.3.1

Импортирует старую версию (0.10.0.1) файла kafka-clients.jar.

Ниже мой client.jaas:

KafkaClient {
 org.apache.kafka.common.security.scram.ScramLoginModule required
 mechanism=SCRAM-SHA-512
 security.protocol=SASL_PLAINTEXT
 client.id="*****"
 username="****"
 password="*****";
};

Я использую ScramLoginModule, и указанный выше jar-файл kafka-client даже не имеет этого jar-файла. Итак, я добавил более позднюю версию jar-файла kafka Clients kafka-clients-1.1.1-cp1.jar

В моей записной книжке у меня есть следующий код:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._

val kafkaSource = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers",   "host:port")   
    .option("subscribe", "topic")    
  .option("startingOffsets", "earliest") 
  .option("maxOffsetsPerTrigger", 1)

  .option("kafka.sasl.mechanism","SCRAM-SHA-512")
  .option("kafka.security.protocol","SASL_PLAINTEXT")
  .load()


val kafkaStream = kafkaSource.
  select(
    $"key" cast "string",   // deserialize keys
    $"value" cast "string", // deserialize values
    $"topic",
    $"partition",
    $"offset",
    $"timestamp",
    $"timestampType")

import org.apache.spark.sql.ForeachWriter

kafkaStream.writeStream.foreach(new ForeachWriter[Row] {

  override def process(row: Row): Unit = {
    println("Processing : " + row.mkString(","))
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }
}).start().awaitTermination()

Из журналов драйверов видно, что я могу нормально подключиться.

18/09/05 13:09:46 INFO AbstractLogin: Successfully logged in.
18/09/05 13:09:46 INFO ConsumerConfig: ConsumerConfig values: 

Но после этого ничего не происходит !! Он просто застревает после печати следующих строк:

18/09/05 13:09:47 INFO AppInfoParser: Kafka version : 0.10.0.1
18/09/05 13:09:47 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5

Приведенная выше версия Kafka также очень сбивает с толку, так как я нигде не использую эту версию (хотя это была версия клиента kafka, которая автоматически импортируется при добавлении spark-sql-kafka-0-10_2.11-2.3 .1 библиотека. Но я решил исключить jar-файл kafka-client при выполнении этого импорта). Клиенты kafka, которые я использую, как я уже сказал, имеют более высокую версию, как и кластер kafka, к которому я подключаюсь.

Чтобы повторить это, я запустил тот же код на кластере искр, работающем на моем ноутбуке, и все работает нормально, где я могу использовать и распечатывать сообщения kafka.

Всем, кто сталкивался с подобной проблемой, пожалуйста, сообщите !!

...