У меня есть следующий код для чтения и обработки данных Kafka с использованием структурированной потоковой передачи
object ETLTest {
case class record(value: String, topic: String)
def main(args: Array[String]): Unit = {
run();
}
def run(): Unit = {
val spark = SparkSession
.builder
.appName("Test JOB")
.master("local[*]")
.getOrCreate()
val kafkaStreamingDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("subscribe", "...")
.option("failOnDataLoss", "false")
.option("startingOffsets","earliest")
.load()
.selectExpr("CAST(value as STRING)", "CAST(timestamp as STRING)","CAST(topic as STRING)")
val sdvWriter = new ForeachWriter[record] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(record: record) = {
println("record:: " + record)
}
def close(errorOrNull: Throwable): Unit = {}
}
val sdvDF = kafkaStreamingDF
.as[record]
.filter($"value".isNotNull)
// DOES NOT WORK
/*val query = sdvDF
.writeStream
.format("console")
.start()
.awaitTermination()*/
// WORKS
/*val query = sdvDF
.writeStream
.foreach(sdvWriter)
.start()
.awaitTermination()
*/
}
}
Я запускаю этот код из IntellijIdea IDE, и когда я использую foreach (sdvWriter), я вижу, что записи используютсяот Kafka, но когда я использую .writeStream.format ("console"), я не вижу никаких записей.Я предполагаю, что поток записи консоли поддерживает какую-то контрольную точку и предполагает, что он обработал все записи.Это тот случай?Я что-то упускаю здесь очевидное?