Да; пожалуйста, смотрите эту документацию как справочную (https://docs.databricks.com/spark/latest/structured-streaming/production.html), и страница 352 в Spark TDG также объясняет это.
Задания Spark Streaming являются непрерывными приложениями, и в рабочем состоянии activityQuery.awaitTermination () требуется, потому что это предотвращает завершение процесса драйвера, когда поток активен (в фоновом режиме).
Если драйвер уничтожен, значит, приложение также уничтожается, поэтому activityQuery.awaitTermination () похоже на отказоустойчивое. Если вы хотите отключить поток в Jupyter, вы можете запустить activityQuery.stop () , чтобы сбросить запрос в целях тестирования ... Надеюсь, это поможет.
activityDataSample = 'path/to/data'
spark.conf.set("spark.sql.shuffle.partitions", 8)
static = spark.read.json(activityDataSample)
dataSchema = static.schema
static.printSchema()
streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
.json(activityDataSample)
activityCounts = streaming.groupBy("gt").count()
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format("memory").outputMode("complete")\
.start()
# simulates a continuous stream for testing (cntrl-C to kill app)
'''
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format("console").outputMode("complete")\
.start()
activityQuery.awaitTermination()
'''
spark.streams.active # query stream is active
[<pyspark.sql.streaming.StreamingQuery at 0x28a4308d320>]
from time import sleep
for x in range(3):
spark.sql("select * from activity_counts").show(3)
sleep(2)
+---+-----+
| gt|count|
+---+-----+
+---+-----+
+--------+-----+
| gt|count|
+--------+-----+
| bike|10796|
| null|10449|
|stairsup|10452|
+--------+-----+
only showing top 3 rows
+--------+-----+
| gt|count|
+--------+-----+
| bike|10796|
| null|10449|
|stairsup|10452|
+--------+-----+
only showing top 3 rows
activityQuery.stop() # stop query stream
spark.streams.active # no active streams anymore
[]