Почему событие позднего прибытия из водяного знака не отклонено - PullRequest
0 голосов
/ 01 октября 2019

Я использую Spark 2.4.3.

Данные о позднем прибытии не были отклонены.

Я использую структурированную потоковую передачу для чтения файлов из папки. Я ожидаю, что данные о позднем прибытии могут быть исключены, но они все еще учитываются.

case class InputRow(...)

val df = spark.readStream
      .format("csv")
      .option("delimiter", ",")
      .schema(schema)
      .csv("/somefolder/")
... 
df.as[InputRow]
      .withWatermark("eventtime", "5 seconds")
      .groupBy($"eventid")
      .count()
      .writeStream.format("console")
      .trigger(Trigger.ProcessingTime("3 seconds"))
      .option("truncate", false)
      .outputMode("update")
      .start
      .awaitTermination()

Первый раз я помещаю файл с одной строкой, например: 2019-08-04 10: 10: 00,1,3328, c1,10,1000

Eventid - 3328Вот. Вывод:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----+
|eventid|count|
+-------+-----+
|3328   |1    |
+-------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-----+
|eventid|count|
+-------+-----+
+-------+-----+

Затем я помещаю другой файл с одной строкой, например: 2019-08-02 10: 10: 00,1,3328, c1,10,1000 Eventid такой же,но время события меняется на 8/2, что явно не соответствует водяному знаку (максимальное время пока - 5 секунд), но вывод:

-------------------------------------------
Batch: 2
-------------------------------------------
+-------+-----+
|eventid|count|
+-------+-----+
|3328   |2    |
+-------+-----+

Итак, вопросы:

  1. Для первого файла, почему есть 2 пакета, а один пакет пуст?
  2. Почему данные о позднем прибытии из водяного знака не отклоняются и все еще агрегируются?

Спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...