Остановите Spark Session через некоторое время - Pyspark - PullRequest
0 голосов
/ 25 февраля 2020

Я делаю ETL в искре, что иногда занимает много времени. Я хочу изящно завершить сеанс спарка через определенное время.

Я пишу свой код в Pyspark.

try:
 df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output)
exception:
 spark.stop()

Я хотел бы через некоторое время прекратить спарк в приведенном выше коде.

Есть ли способ изящно завершить сеанс зажигания через некоторое время ??

1 Ответ

1 голос
/ 01 марта 2020

Я бы предложил использовать официальный python Таймер , чтобы изящно остановить сеанс Spark:

import threading

def timer_elapsed():
    print('Timer elapsed')
    if not sc._jsc.sc().isStopped():
      spark.stop()

# wait for 0.5 sec for Spark job to complete
spark_timer = threading.Timer(0.5, timer_elapsed)
spark_timer.start()

try:
  df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output
  print('Spark job finished successfully.')
except Exception as e:
  spark_timer.cancel() # stop timer, we don't need to wait if error occured
  if not sc._jsc.sc().isStopped():
    spark.stop()

Примечание: Мы остановим сеанс в двух случаях если время истекло или исключение было поймано. Прежде чем запросить остановку контекста Spark, мы проверяем, активен ли контекст с sc._jsc.sc().isStopped, который напрямую вызывает API Java.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...