Я пишу демонстрацию для потоковой структуры Spark, источник и приемник предназначены только для тестирования в локальном режиме
val spark=SparkSession.builder().master("local").getOrCreate()
val stream=spark.readStream.format("rate").option("rowsPerSecond",2).option("rampUpTime",60).load()
stream.writeStream.format("memory").queryName("rate2").start()
spark.sql("select * from rate2").show
spark.stop
Когда я запускаю его, я просто вижу этот вывод в консоли и весь JVM сразу после него завершается.
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+
Поскольку потоковая передача непрерывна, я думаю, что она должна печатать записи каждую секунду, программа не должна останавливаться или выходить, но если она не останавливается, как может быть код запроса выполняется: мы должны использовать многопоточность или многопоточность?
Но даже если я просто выполню метод запуска на фрейме данных, он не будет там зависать.
Так что же является правильным способом непрерывно запускать потоковый процесс, а затем запрашивать его позже и позже?
следуйте ответу Майка, я улучшаю код:
val spark=SparkSession.builder().master("local").getOrCreate()
spark.sparkContext.setLogLevel("error")
val stream=spark.readStream.format("rate").option("rowsPerSecond",1).option("rampUpTime",1).load()
val query=stream.writeStream.format("memory").queryName("rate2").start()
spark.sql("select 1,* from rate2").show //empty table shown
Thread.sleep(1000)
spark.sql("select 2,* from rate2").show //empty table shown
Thread.sleep(5000)
spark.sql("select 3,* from rate2").show //some rows shown
query.awaitTermination(180000)
// spark.sql("select * from rate2").show //no where to execute,not reachable
// spark.stop
формат консоли лучше