Как перезапустить потоковый запрос pyspark из данных контрольной точки? - PullRequest
0 голосов
/ 07 декабря 2018

Я создаю потоковое приложение с использованием pyspark 2.2.0

Я могу создать потоковый запрос

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
      .builder \
      .appName("StreamingApp") \
      .getOrCreate()

staticDataFrame = spark.read.format("parquet")\
.option("inferSchema","true").load("processed/Nov18/")

staticSchema = staticDataFrame.schema
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger",1)\
.format("parquet")\
.load("processed/Nov18/")

daily_trs=streamingDataFrame.select("shift","date","time")
.groupBy("date","shift")\
.count("shift")

writer = df.writeStream\
   .format("parquet")\
   .option("path","data")\
   .option("checkpointLocation","data/checkpoints")\
   .queryName("streamingData")\
   .outputMode("append")

query = writer.start()
query.awaitTermination()

Запрос потоковый и любой дополнительный файл для обработки / обработкиNov18 "будет обработано и сохранено в" data / "

Если потоковая передача не удалась, я хочу перезапустить тот же запрос

Путь к решению

  1. Согласно официальной документации, я могу получить идентификатор, который можно использовать для перезапуска запроса https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html?highlight=streamingquery#pyspark.sql.streaming.StreamingQuery.id

  2. Модуль pyspark.streaming содержит класс StreamingContext с classmethod

    classmethod getActiveOrCreate (checkpointPath, setupFunc) https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.getOrCreate

можно ли как-то использовать эти методы?

Если у кого-нибудь есть пример использования готового к работе потокового приложения для справки

1 Ответ

0 голосов
/ 11 декабря 2018

Вы должны просто (пере) запустить приложение pyspark с доступным каталогом контрольных точек, а Spark Structured Streaming сделает все остальное.Никаких изменений не требуется.

Если у кого-нибудь есть пример использования готового к работе потокового приложения для справки?

Я бы спросил в списке рассылки Spark users .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...