Spark потоковая агрегация обрабатывает пропущенные строки - PullRequest
0 голосов
/ 13 сентября 2018

У меня есть потоковое задание Spark, которое объединяет строки, используя идентификатор и временное окно.Мой вопрос заключается в том, что если поток ранее получал строки с определенным идентификатором, а затем, в течение более длительного периода времени, чем тот, который использовался для агрегации, нет больше агрегированных строк для этого идентификатора?У меня не будет строки, связанной с этим идентификатором и временными окнами?Если да, есть ли способ сохранить эту информацию?Нужно ли вести список активных идентификаторов?

Пример:

 Minutes sequence number :              1-2-3-4-5-6-7-8-9-10
 Number of row with id = "1" :          2-4-0-0-0-0-0-0-3-5
                                            \_________/
                                No row with id "1" during this time window

Примечание: цель этого - вычисление времени простоя.Есть ли способ обработки простоя и обнаружения повторной активации в потоковой передаче?Или я должен сделать это, используя периодическое пакетное задание?

val stream = spark.readStream...

val aggregationExpr: Array[Column] = avg(col("col1")) ++ sum(col("col2")) ++ count(lit(1)))

stream.withWatermark("timestamp", "10 minute")
      .groupBy(
        col("device_id"),
        window(col("timestamp"), "5 minute", "1 minute"))
      .agg(aggregationExpr.head, aggregationExpr.tail:_*)

Дополнительный глупый вопрос, а что, если пороговое значение водяного знака меньше, чем временное окно, используемое для агрегирования?Реальный порог будет суммой обоих?Или это просто выбросит исключение?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...