Spark Streaming пишет в Кафку с задержкой - через x минут - PullRequest
1 голос
/ 06 июня 2019

У нас есть приложение Streaming Stream.Архитектура выглядит следующим образом:

Kinesis Spark to Kafka.

Приложение Spark использует qubole / kinesis-sql для структурированной потоковой передачи из Kinesis.Затем данные агрегируются и затем передаются в Kafka.

Наш сценарий использования требует задержки в 4 минуты перед отправкой в ​​Kafka.

Окно выполняется за 2 минуты и водяным знаком 4 минуты

val windowedCountsDF = messageDS
   .withWatermark("timestamp", "4 minutes")
   .groupBy(window($"timestamp", "2 minutes", "2 minutes"), $"id", $"eventType", $"topic")

Запись в Kafkaсрабатывает каждые две минуты

val eventFilteredQuery = windowedCountsDF
  .selectExpr("topic", "id as key", "to_json(struct(*)) AS value")
  .writeStream
  .trigger(Trigger.ProcessingTime("2 minutes"))
  .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
  .option("checkpointLocation", checkPoint)
  .outputMode("update")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .queryName("events_kafka_stream")
  .start()

Я могу изменить время запуска в соответствии с окном, но все же некоторые события мгновенно передаются в кафку.

Есть ли способ отложить запись в Kafka через x минут после закрытия окна.

Спасибо

1 Ответ

1 голос
/ 06 июня 2019

Измените режим вывода с update на append (опция по умолчанию). Режим output запишет все обновленные строки в приемник, следовательно, если вы используете водяной знак или нет, это не имеет значения.

Тем не менее, в режиме append любые записи должны будут ждать пересечения водяного знака - что именно то, что вы хотите:

Режим добавления использует водяной знак для удаления старого состояния агрегации. Но вывод оконной агрегации задерживается на более поздний порог, указанный в withWatermark(), так как в соответствии с семантикой режимов строки могут быть добавлены в таблицу результатов только один раз после того, как они завершены (то есть после пересечения водяного знака).

...