Ниже показано, как я читаю данные из kafka.
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", """{"topic1":{"1":-1}}""")
.load()
val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
Как получить максимальные и минимальные смещения и временную метку с указанного выше кадра данных?Я хочу сохранить его в каком-то внешнем источнике для дальнейшего использования. Я не могу использовать функцию 'agg', так как пишу тот же кадр данных для записи (как показано ниже)
val kafkaOutput = df.writeStream
.outputMode("append")
.option("path", "/warehouse/download/data1")
.format("console")
.option("checkpointLocation", checkpoint_loc)
.start()
.awaitTermination()