Это код, который я запускаю в PySpark, но по указанному пути к набору данных у меня нет файла формата json.
from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
def read_dataset(spark, dataset_path, format):
try:
spark_dataset = spark.read.format(format).load(dataset_path)
return spark_dataset
except Py4JJavaError as e:
print(e)
read_dataset(spark, 'xyz', format='json')
Таким образом, после тайм-аута возникнет ошибка SparkException. Как я могу просто захватить сообщение об ошибке от выданной ошибки?
An error occurred while calling o4620.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 110 in stage 5.0 failed 10 times, most recent failure: Lost task 110.9 in stage 5.0 (TID 4478, executor 8): java.io.CharConversionException: Invalid UTF-32 character 0x40498000(above 10ffff) at char #2, byte #11)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2017)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:577)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1$$anonfun$apply$3.apply(JsonInferSchema.scala:56)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1$$anonfun$apply$1$$anonfun$apply$3.apply(JsonInferSchema.scala:55)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543)
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1892)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1880)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1879)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:108)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.infer(JsonDataSource.scala:98)
at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:64)
at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:59)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:179)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
.....
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more