У меня есть потоковое задание Spark, которое объединяет строки, используя идентификатор и временное окно.Мой вопрос заключается в том, что если поток ранее получал строки с определенным идентификатором, а затем, в течение более длительного периода времени, чем тот, который использовался для агрегации, нет больше агрегированных строк для этого идентификатора?У меня не будет строки, связанной с этим идентификатором и временными окнами?Если да, есть ли способ сохранить эту информацию?Нужно ли вести список активных идентификаторов?
Пример:
Minutes sequence number : 1-2-3-4-5-6-7-8-9-10
Number of row with id = "1" : 2-4-0-0-0-0-0-0-3-5
\_________/
No row with id "1" during this time window
Примечание: цель этого - вычисление времени простоя.Есть ли способ обработки простоя и обнаружения повторной активации в потоковой передаче?Или я должен сделать это, используя периодическое пакетное задание?
val stream = spark.readStream...
val aggregationExpr: Array[Column] = avg(col("col1")) ++ sum(col("col2")) ++ count(lit(1)))
stream.withWatermark("timestamp", "10 minute")
.groupBy(
col("device_id"),
window(col("timestamp"), "5 minute", "1 minute"))
.agg(aggregationExpr.head, aggregationExpr.tail:_*)
Дополнительный глупый вопрос, а что, если пороговое значение водяного знака меньше, чем временное окно, используемое для агрегирования?Реальный порог будет суммой обоих?Или это просто выбросит исключение?