Это код, который я запускаю в PySpark и пытаюсь перехватить сообщение об ошибке из-за возникшей ошибки:
from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
def read_dataset(spark, dataset_path):
try:
spark_dataset = spark.read.format('ORC').load(dataset_path)
return spark_dataset
except Py4JJavaError as e:
print(e)
read_dataset(spark, 'xyz')
Сообщение об ошибке:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
/opt/conda/anaconda/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
Py4JJavaError: An error occurred while calling o1511.load.
: org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://xyz;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
.....
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
AnalysisException Traceback (most recent call last)
<ipython-input-50-af5d4c825875> in <module>
----> 1 read_dataset(sparksession, 'xyz')
<ipython-input-46-ea8a8a5138d6> in read_dataset(self, dataset_gcs_path)
3 def read_dataset(self, dataset_gcs_path):
4 try:
----> 5 spark_dataset = spark.read.format('ORC').load(dataset_gcs_path)
/usr/lib/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
164 self.options(**options)
165 if isinstance(path, basestring):
--> 166 return self._df(self._jreader.load(path))
167 elif path is not None:
168 if type(path) != list:
/opt/conda/anaconda/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: 'Path does not exist: xyz;'
Я просто хочу перехватить сообщение об ошибке «AnalysisException: путь не существует: xyz;» из напечатанной ошибки. Есть ли способ сделать это, или кто-то может помочь с этим?