Я бы предложил использовать официальный python Таймер , чтобы изящно остановить сеанс Spark:
import threading
def timer_elapsed():
print('Timer elapsed')
if not sc._jsc.sc().isStopped():
spark.stop()
# wait for 0.5 sec for Spark job to complete
spark_timer = threading.Timer(0.5, timer_elapsed)
spark_timer.start()
try:
df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output
print('Spark job finished successfully.')
except Exception as e:
spark_timer.cancel() # stop timer, we don't need to wait if error occured
if not sc._jsc.sc().isStopped():
spark.stop()
Примечание: Мы остановим сеанс в двух случаях если время истекло или исключение было поймано. Прежде чем запросить остановку контекста Spark, мы проверяем, активен ли контекст с sc._jsc.sc().isStopped
, который напрямую вызывает API Java.