У меня есть агрегация, как показано ниже:
Я использую структурированную потоковую передачу для получения всего продукта идентификаторов и их ср.значение только если среднеепревышает определенный порог.
val aggregatedDataset = streamDataset
.select($"id", $"time", $"value")
.withWatermark("$time", "5 minutes")
.groupBy(window($"time", "10 minutes", "60 seconds"), $"id").agg(avg("value") as "avg")
.filter($"avg" > 10)
Затем он публикуется в Kafka.Однако я вижу несколько записей для одного и того же id с разным средним значением.значения, которые я думаю, потому что выход срабатывает после каждой партии.
Итак, есть ли способ инициировать вывод для окна только после того, как spark удостоверится в совокупности для этого окна?
Даже если я разрешу такие дубликаты записей в Кафке, как следуетслучаи должны быть обработаны:
среднее для определенного продукта id превысило порог , и результаты были опубликованы, но при просмотре большего количества записей avg фактически опустился ниже порога.Spark уже опубликовал предыдущий продукт id , но который фактически не должен был быть опубликован.