Spark Streaming Job считывает данные о событиях из тем Кафки, агрегирует на основе меток времени и производит подсчеты.
Теперь проблема в том, что входящие метки времени не в порядке.Они могут иметь +/- 5days
отличие от текущей отметки времени.
Это побеждает цель watermarking
.Есть ли другой способ сортировки и агрегации данных на некотором временном интервале?
//Read
Dataset<Row> stream = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "kafkaBootstrapServersString")
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("enable.auto.commit", false)
.load();
//Aggregate
stream
.select(
col("timestamp"),
col("platform")
)
.groupBy(
functions.window(col("timestamp"), "30 minutes"),
col("platform")
)
.agg(
count(lit(1)).as("count")
);
//Write
stream
.writeStream()
.outputMode(OutputMode.Complete())
.format("console")
.trigger(Trigger.ProcessingTime("30 minutes"))
.start();