У нас есть приложение Streaming Stream.Архитектура выглядит следующим образом:
Kinesis Spark to Kafka.
Приложение Spark использует qubole / kinesis-sql для структурированной потоковой передачи из Kinesis.Затем данные агрегируются и затем передаются в Kafka.
Наш сценарий использования требует задержки в 4 минуты перед отправкой в Kafka.
Окно выполняется за 2 минуты и водяным знаком 4 минуты
val windowedCountsDF = messageDS
.withWatermark("timestamp", "4 minutes")
.groupBy(window($"timestamp", "2 minutes", "2 minutes"), $"id", $"eventType", $"topic")
Запись в Kafkaсрабатывает каждые две минуты
val eventFilteredQuery = windowedCountsDF
.selectExpr("topic", "id as key", "to_json(struct(*)) AS value")
.writeStream
.trigger(Trigger.ProcessingTime("2 minutes"))
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("checkpointLocation", checkPoint)
.outputMode("update")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.queryName("events_kafka_stream")
.start()
Я могу изменить время запуска в соответствии с окном, но все же некоторые события мгновенно передаются в кафку.
Есть ли способ отложить запись в Kafka через x минут после закрытия окна.
Спасибо