У меня есть задание потоковой передачи, которое считывает данные из нескольких тем Кафки.Теперь я хочу объединить данные по нескольким window intervals
и сохранить их в базе данных.
Возможно ли так?Или мне понадобится отдельное задание зажигания для выполнения другого уровня агрегации.
Потеряны ли данные при обработке набора данных во второй раз?
Агрегация 1:
private void buildStream1(Dataset<Row> sourceDataset) {
Dataset<Row> query = sourceDataset
.withWatermark("timestamp", "120 seconds")
.select(
col("timestamp"),
col("datacenter"),
col("platform")
)
.groupBy(
functions.window(col("timestamp"), 120 seconds, 60 seconds).as("timestamp"),
col("datacenter"),
col("platform")
)
.agg(
count(lit(1)).as("count")
);
startKafkaStream(query);
}
Агрегация2:
private void buildStream1(Dataset<Row> sourceDataset) {
Dataset<Row> query = sourceDataset
.withWatermark("timestamp", "10 minutes")
.select(
col("timestamp"),
col("datacenter"),
col("platform")
)
.groupBy(
functions.window(col("timestamp"), 10 minutes, 5 minutes).as("timestamp"),
col("datacenter"),
col("platform")
)
.agg(
count(lit(1)).as("count")
);
startKafkaStream(query);
}
Запись обоих потоков:
private void startKafkaStream(Dataset<Row> aggregatedDataset) {
aggregatedDataset
.select(to_json(struct("*")).as("value"))
.writeStream()
.outputMode(OutputMode.Append())
.option("truncate", false)
.format("console")
.trigger(Trigger.ProcessingTime(10 minutes))
.start();
}