Я создал задание структурированной потоковой передачи, которое выглядит так, как указано ниже. На первом этапе я добавил только один файл в INPUT_DIRECTORY, и он работал нормально. Насколько я понимаю, если новый файл EXACT SAME будет добавлен в тот же каталог, количество будет удвоено, потому что я использую режим «complete». Но этого не происходит. Мое понимание «завершенного» режима неверно? Вот код:
Dataset<StoreSales> storeSalesStream = spark
.readStream()
.schema(storeSalesSchema)
.csv(INPUT_DIRECTORY)
.as(Encoders.bean(StoreSales.class));
//When data arrives from the stream, these steps will get executed
//4 - Create a temporary table so we can use SQL queries
storeSalesStream.createOrReplaceTempView("storeSales");
String sql = "SELECT AVG(ss_quantity) as average_quantity, count(*) as cnt, ss_store_sk FROM storeSales GROUP BY ss_store_sk order by ss_store_sk";
Dataset<Row> ageAverage = spark.sql(sql);
//5 - Write the the output of the query to the console
StreamingQuery query = ageAverage.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
Вывод с одним файлом:
+------------------+------+-----------+
| average_quantity| cnt|ss_store_sk|
+------------------+------+-----------+
| 50.60551037038176|130035| null|
|50.550020846689414|456896| 1|
| 50.5936442439659|458159| 2|
| 50.43842163027273|458272| 4|
| 50.55502265092984|458194| 7|
| 50.47613176523048|459357| 8|
| 50.47919908681093|459492| 10|
+------------------+------+-----------+
Вывод после копирования нового (точно такого же) файла:
+------------------+------+-----------+
| average_quantity| cnt|ss_store_sk|
+------------------+------+-----------+
| 50.63186245101668|156491| null|
| 50.54070128518595|549167| 1|
|50.600774270842244|550477| 2|
| 50.46126613833389|550604| 4|
| 50.57143520798298|551066| 7|
| 50.46779475780309|552865| 8|
| 50.46881408984539|552466| 10|
+------------------+------+-----------+