как остановить SparkContext, который создается внутри функции? - PullRequest
0 голосов
/ 13 февраля 2019

У меня есть функция для создания SparkContext и SQLContext.

 def init_spark(query=None):
     def quiet_logs( sc ):
       logger = sc._jvm.org.apache.log4j
       logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
       logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

     if 'sc' in locals():
         sc.stop()
     conf = pyspark.SparkConf()
     conf.set("spark.driver.allowMultipleContexts", "true")
     conf.set("es.index.auto.create", "true")
     conf.set("es.nodes.discovery", "true")
     conf.set("es.read.field.exclude", "data.integrations")

     if query:
         conf.set("es.query", query)

     sc = pyspark.SparkContext(conf=conf)
     quiet_logs(sc)

     sqlContext = pyspark.SQLContext(sc)
     return sqlContext

очевидно, проверка sc.stop () не работает, и я попытался сделать это вне функции, все еще не работает ..

1 Ответ

0 голосов
/ 13 февраля 2019

Может быть, посмотрите на getOrCreate и newSession , вы можете сначала остановить текущую, а затем создать новую из конфигурации, так что вам не придется делать Pythonуровень проверки.

# Stop whatever you are getting here
SparkSession.builder.getOrCreate().stop()

# New session with your config
spark = SparkSession.builder.config(conf=conf).newSession()
sc = spark.sparkContext

Теперь первая часть кажется неэффективной (если не было сессии, она будет создана, а затем уничтожена), не знаю, какие накладные расходы это вызывает.

edit: Или потяните sc наружу и проверьте, назначено ли оно

sc = None

def init_spark(query=None):

  global sc

  if sc is not None:
    sc.stop()

  ...

  sc = pyspark.SparkContext(conf=conf)

  ...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...