Я использую Spark 2.4.3.
Данные о позднем прибытии не были отклонены.
Я использую структурированную потоковую передачу для чтения файлов из папки. Я ожидаю, что данные о позднем прибытии могут быть исключены, но они все еще учитываются.
case class InputRow(...)
val df = spark.readStream
.format("csv")
.option("delimiter", ",")
.schema(schema)
.csv("/somefolder/")
...
df.as[InputRow]
.withWatermark("eventtime", "5 seconds")
.groupBy($"eventid")
.count()
.writeStream.format("console")
.trigger(Trigger.ProcessingTime("3 seconds"))
.option("truncate", false)
.outputMode("update")
.start
.awaitTermination()
Первый раз я помещаю файл с одной строкой, например: 2019-08-04 10: 10: 00,1,3328, c1,10,1000
Eventid - 3328Вот. Вывод:
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----+
|eventid|count|
+-------+-----+
|3328 |1 |
+-------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-----+
|eventid|count|
+-------+-----+
+-------+-----+
Затем я помещаю другой файл с одной строкой, например: 2019-08-02 10: 10: 00,1,3328, c1,10,1000 Eventid такой же,но время события меняется на 8/2, что явно не соответствует водяному знаку (максимальное время пока - 5 секунд), но вывод:
-------------------------------------------
Batch: 2
-------------------------------------------
+-------+-----+
|eventid|count|
+-------+-----+
|3328 |2 |
+-------+-----+
Итак, вопросы:
- Для первого файла, почему есть 2 пакета, а один пакет пуст?
- Почему данные о позднем прибытии из водяного знака не отклоняются и все еще агрегируются?
Спасибо!