В соответствии с документацией о потоковой потоковой структуризации в https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html, строки из некоторых данных промежуточного состояния добавляются в таблицу результатов после обработки запроса потоковой передачи, и размер таблицы результатов продолжает увеличиваться с каждым входящим партия.
Я могу проверить это, распечатав также размер результирующего набора данных.
private static Map<String,String> map = new HashMap<>();
static{
map.put("mode", "FAILFAST");
map.put("kafka.bootstrap.servers","localhost:9092");
map.put("subscribe","test5");
map.put("startingOffsets", "earliest");
map.put("maxOffsetsPerTrigger","100");
}
public void exec(SparkSession sparkSession){
Dataset<Row> dataSet= sparkSession.readStream().format("kafka").options(map).load();
dataSet=dataSet.selectExpr("CAST(key AS STRING)");
Dataset<Row> countQuery=receievedMessageDataset.selectExpr("COUNT(key)");
StreamingQuery sq1= countQuery.writeStream().format("console").outputMode("append").start();
try{
sq1.awaitTermination(10000);
}catch (Exception e){
e.printStackTrace();
}
}
-------------------------------------------
Batch: 0
-------------------------------------------
+----------+
|count(key)|
+----------+
| 99|
+----------+
-------------------------------------------
Batch: 1
-------------------------------------------
+----------+
|count(key)|
+----------+
| 198|
+----------+
Я работаю над проектом, который читает данные из потокового источника, выполняет некоторые преобразования только для текущего пакетного триггера и публикует их. Поэтому я хочу ограничить данные таблицы результатов только обработанным выводом текущего пакета обработки и не должен содержать содержимое предыдущего пакета, поскольку это также может быть проблемой, если размер таблицы результатов превышает доступный объем памяти. Как я могу добиться этого поведения?