Я пытаюсь перенести мои API DStream в затрудненную потоковую передачу и сталкиваюсь с тем, как ожидать или не в состоянии соотнести микробатирование со структурированной потоковой передачей.
В приведенном ниже коде я создаю прямой поток и ожидаю вечно, чтобы я мог бесконечно потреблять сообщения kafka.
Как мне добиться того же в структурированной потоковой передаче?
хватит ли sparkSession.streams.awaitAnyTermination?
Я поместил пример кода ниже как в потоковую, так и в структурированную потоковую передачу. Любые указатели были бы очень полезны. Спасибо
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"max.poll.records" -> "1",
"group.id" -> "test",
"enable.auto.commit" -> (true: java.lang.Boolean))
val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(10))
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,Subscribe[String, String]("mytopic",kafkaParams))
performRddComputation(stream, sparkSession)
ssc.start()
ssc.awaitTermination()
Структурированный потоковый эквивалент
val df = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafkfa.offset.strategy","latest")
.option("subscribe", "mytopic")
.load()
df.printSchema()
val tdf = df.selectExpr("CAST(value AS STRING)").as[String].select("value").writeStream.format("console")
.option("truncate","false")
.start()
tdf.map(record => {//do something})
sparkSession.streams.awaitAnyTermination