Как ограничить размер таблицы результирующего набора данных / таблицы данных только текущим входящим триггером в Spark? - PullRequest
0 голосов
/ 10 января 2019

В соответствии с документацией о потоковой потоковой структуризации в https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html, строки из некоторых данных промежуточного состояния добавляются в таблицу результатов после обработки запроса потоковой передачи, и размер таблицы результатов продолжает увеличиваться с каждым входящим партия.

Я могу проверить это, распечатав также размер результирующего набора данных.


    private static Map<String,String> map = new HashMap<>();
    static{
            map.put("mode", "FAILFAST");
            map.put("kafka.bootstrap.servers","localhost:9092");
            map.put("subscribe","test5");
            map.put("startingOffsets", "earliest");
            map.put("maxOffsetsPerTrigger","100");
    }
    public void exec(SparkSession sparkSession){
            Dataset<Row> dataSet= sparkSession.readStream().format("kafka").options(map).load();
            dataSet=dataSet.selectExpr("CAST(key AS STRING)");
            Dataset<Row> countQuery=receievedMessageDataset.selectExpr("COUNT(key)");
            StreamingQuery sq1= countQuery.writeStream().format("console").outputMode("append").start();
            try{
                sq1.awaitTermination(10000);
            }catch (Exception e){
                e.printStackTrace();


}    
}

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+
|count(key)|
+----------+
|        99|
+----------+
-------------------------------------------
Batch: 1
-------------------------------------------
+----------+
|count(key)|
+----------+
|       198|
+----------+

Я работаю над проектом, который читает данные из потокового источника, выполняет некоторые преобразования только для текущего пакетного триггера и публикует их. Поэтому я хочу ограничить данные таблицы результатов только обработанным выводом текущего пакета обработки и не должен содержать содержимое предыдущего пакета, поскольку это также может быть проблемой, если размер таблицы результатов превышает доступный объем памяти. Как я могу добиться этого поведения?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...