Я использую структурную потоковую интеграцию kafka для потоковой передачи данных из концентратора событий и печати в консоли, как показано в примере ниже, но я ничего не получаю на консоли, даже если я могу показать данные в консоли с помощью org.apache.spark.eventhubsAPI структурированной потоковой передачи.
import org.apache.spark.sql.kafka010._
val spark = SparkSession.builder()
.master("local[*]")
.appName("kafkaeventhubconsumer")
.getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<EVENT_HUB_FQDN>:9093")
.option("subscribe", "<EVENT_HUB_NAME>")
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism" , "PLAIN")
.option("sasl.jaas.config", """org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<CONNECTION_STRING>";""")
.load()
df.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()