Spark Streaming awaitTermination в ноутбуке Jupyter - PullRequest
0 голосов
/ 25 апреля 2019

Я следую вместе с кодом в Apache Spark Definitive Guide.Я столкнулся с проблемой, когда следующий код не печатает результат в блокноте Jupyter, когда у меня есть закомментированная строка кода "awaitTermination ()".С включенным в код "awaitTermination ()" ядро ​​Jupyter занято и остается долгое время, возможно, неограниченное время.

Без "awaitTermination" код работает нормально.

Может кто-нибудь объяснить этоповедение.Как я мог это преодолеть?

static = spark.read.json(r"/resources/activity-data/")
dataSchema = static.schema
streaming = (spark
             .readStream
             .schema(dataSchema)
             .option("maxFilesPerTrigger", 1)
             .json(r"/resources/activity-data/")
            )
activityCounts = streaming.groupBy("gt").count()
spark.conf.set("spark.sql.shuffle.partitions", 5)
activityQuery = (activityCounts
                 .writeStream
                 .queryName("activity_counts")
                 .format("memory")
                 .outputMode("complete")
                 .start()
                )
#activityQuery.awaitTermination()
#activityQuery.stop()
from time import sleep
for x in range(5):
    spark.table("activity_counts").show()
    sleep(1)

1 Ответ

0 голосов
/ 25 апреля 2019

Да; пожалуйста, смотрите эту документацию как справочную (https://docs.databricks.com/spark/latest/structured-streaming/production.html), и страница 352 в Spark TDG также объясняет это.

Задания Spark Streaming являются непрерывными приложениями, и в рабочем состоянии activityQuery.awaitTermination () требуется, потому что это предотвращает завершение процесса драйвера, когда поток активен (в фоновом режиме).

Если драйвер уничтожен, значит, приложение также уничтожается, поэтому activityQuery.awaitTermination () похоже на отказоустойчивое. Если вы хотите отключить поток в Jupyter, вы можете запустить activityQuery.stop () , чтобы сбросить запрос в целях тестирования ... Надеюсь, это поможет.

activityDataSample = 'path/to/data'
spark.conf.set("spark.sql.shuffle.partitions", 8)
static = spark.read.json(activityDataSample)
dataSchema = static.schema
static.printSchema()

streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
.json(activityDataSample)

activityCounts = streaming.groupBy("gt").count()

activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format("memory").outputMode("complete")\
.start()

# simulates a continuous stream for testing (cntrl-C to kill app)
'''
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format("console").outputMode("complete")\
.start()
activityQuery.awaitTermination()
'''

spark.streams.active # query stream is active
[<pyspark.sql.streaming.StreamingQuery at 0x28a4308d320>]

from time import sleep
for x in range(3):
    spark.sql("select * from activity_counts").show(3)
    sleep(2)
+---+-----+
| gt|count|
+---+-----+
+---+-----+

+--------+-----+
|      gt|count|
+--------+-----+
|    bike|10796|
|    null|10449|
|stairsup|10452|
+--------+-----+
only showing top 3 rows

+--------+-----+
|      gt|count|
+--------+-----+
|    bike|10796|
|    null|10449|
|stairsup|10452|
+--------+-----+
only showing top 3 rows

activityQuery.stop() # stop query stream
spark.streams.active # no active streams anymore
[]
...