PySpark перезапустить SparkContext в случае сбоя - PullRequest
0 голосов
/ 22 мая 2018

Мне нужно вычислить несколько агрегатов для каждой таблицы в базе данных Hive.Мой код выглядит примерно так:

sc = SparkContext()
sqlContext = HiveContext(sc)

showtables_df = sqlContext.sql('show tables in my_db')
for onlinetable in showtables_df.select('tableName').rdd.collect():
    hive_table_name = onlinetable['tableName']
    try:
        table_df = sqlContext.sql('SELECT * FROM my_db.' + hive_table_name)
        table_df.count() # sample action
    except Exception as e:
        logger.error('Error in table ' + hive_table_name)
        logger.error(type(e))
        logger.error(e)

В определенный момент чтение из огромной таблицы вызывает исключение, и SparkContext закрывается.С этого момента каждый вызов sqlContext завершается ошибкой:

[2018-05-20 07:40:36] ERROR - my_table: An error occurred while calling o40.sql.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
py4j.Gateway.invoke(Gateway.java:214)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
py4j.GatewayConnection.run(GatewayConnection.java:209)
java.lang.Thread.run(Thread.java:745)

The currently active SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
py4j.Gateway.invoke(Gateway.java:214)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
py4j.GatewayConnection.run(GatewayConnection.java:209)
java.lang.Thread.run(Thread.java:745)

        at org.apache.spark.SparkContext.org$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:106)
        at org.apache.spark.SparkContext$$anonfun$parallelize$1.apply(SparkContext.scala:729)
        at org.apache.spark.SparkContext$$anonfun$parallelize$1.apply(SparkContext.scala:728)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
        at org.apache.spark.SparkContext.parallelize(SparkContext.scala:728)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:145)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:130)
        at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
        at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)

На данный момент я не могу добавить ресурсы к своей работе, но я бы хотел перезапустить SparkContext и продолжить вычисленияагрегации в остальных таблицах цикла for.

Является ли хорошей (и выполнимой) идеей проверить, что SparkContext должен быть закрыт в обработчике исключений, и в конечном итоге воссоздает его с помощью:

...
except Exception as e:
    logger.error('Error in table ' + hive_table_name)
    logger.error(type(e))
    logger.error(e)
    if sc._jsc.sc().isStopped():
        sc = SparkContext()
        sqlContext = HiveContext(sc)

Я работаю с Spark версии 1.6.1, выполняю задания с - master yarn --deploy-mode client

...