Как получить прогресс потокового запроса после awaitTermination? - PullRequest
0 голосов
/ 30 января 2019

Я новичок в Spark и немного читал о мониторинге приложений Spark.По сути, я хочу знать, сколько записей было обработано приложением spark за заданное время запуска и ход выполнения запроса.Я знаю, что «lastProgress» дает все эти метрики, но когда я использую awaitTermination с «lastProgress», он всегда возвращает ноль.

 val q4s = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", topic)
  .option("startingOffsets", "earliest")
  .load()
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", checkpoint_loc)
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .format("console")
  .start()

  println("Query Id: "+ q4s.id.toString())
  println("QUERY PROGRESS.........")
println(q4s.lastProgress);
q4s.awaitTermination();

Вывод:

Query Id: efd6bc15-f10c-4938-a1aa-c81fdb2b33e3
QUERY PROGRESS.........
null

Как получить прогрессмой запрос при использовании awaitTermination или как я могу поддерживать мой запрос в непрерывном режиме без использования awaitTermination?

Заранее спасибо.

1 Ответ

0 голосов
/ 30 января 2019

Вы должны запустить отдельный поток со ссылкой на потоковый запрос для отслеживания (скажем, q4s) и регулярно получать информацию о ходе выполнения.

Поток, запустивший запрос (основной поток вашего SparkПриложение структурированной потоковой передачи) обычно имеет значение awaitTermination, поэтому потоки демонов запускаемых потоковых запросов могут продолжать выполняться.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...