Я создаю потоковое приложение с использованием pyspark 2.2.0
Я могу создать потоковый запрос
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("StreamingApp") \
.getOrCreate()
staticDataFrame = spark.read.format("parquet")\
.option("inferSchema","true").load("processed/Nov18/")
staticSchema = staticDataFrame.schema
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger",1)\
.format("parquet")\
.load("processed/Nov18/")
daily_trs=streamingDataFrame.select("shift","date","time")
.groupBy("date","shift")\
.count("shift")
writer = df.writeStream\
.format("parquet")\
.option("path","data")\
.option("checkpointLocation","data/checkpoints")\
.queryName("streamingData")\
.outputMode("append")
query = writer.start()
query.awaitTermination()
Запрос потоковый и любой дополнительный файл для обработки / обработкиNov18 "будет обработано и сохранено в" data / "
Если потоковая передача не удалась, я хочу перезапустить тот же запрос
Путь к решению
Согласно официальной документации, я могу получить идентификатор, который можно использовать для перезапуска запроса https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html?highlight=streamingquery#pyspark.sql.streaming.StreamingQuery.id
Модуль pyspark.streaming содержит класс StreamingContext с classmethod
classmethod getActiveOrCreate (checkpointPath, setupFunc) https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.getOrCreate
можно ли как-то использовать эти методы?
Если у кого-нибудь есть пример использования готового к работе потокового приложения для справки