Как мы можем поймать SparkException в PySpark? - PullRequest
0 голосов
/ 30 апреля 2020

Это код, который я запускаю в PySpark, но по указанному пути к набору данных у меня нет файла формата json.

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
def read_dataset(spark, dataset_path, format):
    try:
        spark_dataset = spark.read.format(format).load(dataset_path)
        return spark_dataset
    except Py4JJavaError as e:
        print(e)
read_dataset(spark, 'xyz', format='json')

Таким образом, после тайм-аута возникнет ошибка SparkException. Как я могу просто захватить сообщение об ошибке от выданной ошибки?

An error occurred while calling o4620.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 110 in stage 5.0 failed 10 times, most recent failure: Lost task 110.9 in stage 5.0 (TID 4478, executor 8): java.io.CharConversionException: Invalid UTF-32 character 0x40498000(above 10ffff)  at char #2, byte #11)
    at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
    at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2017)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:577)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1$$anonfun$apply$3.apply(JsonInferSchema.scala:56)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1$$anonfun$apply$3.apply(JsonInferSchema.scala:55)
    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543)
    at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185)
    at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
    at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1892)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1880)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1879)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:108)
    at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.infer(JsonDataSource.scala:98)
    at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:64)
    at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:59)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:179)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    .....
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
...