структура потокового демо, программа выхода сразу как пакетная работа, а не непрерывная, искра - PullRequest
0 голосов
/ 06 марта 2020

Я пишу демонстрацию для потоковой структуры Spark, источник и приемник предназначены только для тестирования в локальном режиме

   val spark=SparkSession.builder().master("local").getOrCreate()
   val stream=spark.readStream.format("rate").option("rowsPerSecond",2).option("rampUpTime",60).load()
   stream.writeStream.format("memory").queryName("rate2").start()
   spark.sql("select * from rate2").show  
   spark.stop

Когда я запускаю его, я просто вижу этот вывод в консоли и весь JVM сразу после него завершается.

+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+

Поскольку потоковая передача непрерывна, я думаю, что она должна печатать записи каждую секунду, программа не должна останавливаться или выходить, но если она не останавливается, как может быть код запроса выполняется: мы должны использовать многопоточность или многопоточность?

Но даже если я просто выполню метод запуска на фрейме данных, он не будет там зависать.

Так что же является правильным способом непрерывно запускать потоковый процесс, а затем запрашивать его позже и позже?

следуйте ответу Майка, я улучшаю код:

   val spark=SparkSession.builder().master("local").getOrCreate()
   spark.sparkContext.setLogLevel("error")
   val stream=spark.readStream.format("rate").option("rowsPerSecond",1).option("rampUpTime",1).load()
   val query=stream.writeStream.format("memory").queryName("rate2").start()   
   spark.sql("select 1,* from rate2").show //empty table shown
   Thread.sleep(1000)
   spark.sql("select 2,* from rate2").show //empty table shown
   Thread.sleep(5000)
   spark.sql("select 3,* from rate2").show //some rows shown
   query.awaitTermination(180000)
//   spark.sql("select * from rate2").show  //no where to execute,not reachable
//   spark.stop

формат консоли лучше

1 Ответ

0 голосов
/ 06 марта 2020

Ваша JVM (SparkSession) завершается, потому что вы звоните spark.stop.

Вместо того, чтобы останавливать SparkSession, вы должны скорее дождаться сигнала завершения (может быть Ctrl + C или SIGTERM) до

stream.awaitTermination()

См. Пример в Документация Spark .

...