Я использую SparkLauncher
для подключения к Spark в кластерном режиме поверх Yarn. Я запускаю код SQL, используя Scala, например:
def execute(code: String): Unit = {
try {
val resultDataframe = spark.sql(code)
resultDataframe.write.json("s3://some/prefix")
catch {
case NonFatal(f) =>
log.warn(s"Fail to execute query $code", f)
log.info(f.getMessage, getNestedStackTrace(f, Seq[String]()))
}
}
def getNestedStackTrace(e: Throwable, msg: Seq[String]): Seq[String] = {
if (e.getCause == null) return msg
getNestedStackTrace(e.getCause, msg ++ e.getStackTrace.map(_.toString))
}
Теперь, когда я запускаю запрос, который должен завершиться ошибкой с помощью метода execute()
, например, запрашивая секционированную таблицу без секционированный предикат - select * from partitioned_table_on_dt limit 1;
, я получаю неверную трассировку стека.
Правильная трассировка стека при запуске Spark. sql (code) .write. json () вручную из Spark-Shell:
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *(1) LocalLimit 1
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
...
Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: No partition predicate found for partitioned table
partitioned_table_on_dt.
If the table is cached in memory then turn off this check by setting
hive.mapred.mode to nonstrict
at org.apache.spark.sql.hive.execution.HiveTableScanExec.prunePartitions(HiveTableScanExec.scala:155)
...
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
...
Некорректная трассировка стека из метода execute () выше:
Job Aborted:
"org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)",
"org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)",
"org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)",
"org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)",
...
"org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)",
"org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)",
"org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)",
"org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)",
...
Трассировка стека Spark-Shell имеет три вложенных исключения SparkException
(SemanticException
(TreeNodeException
)), но трассировка, которую я вижу в своем коде, происходит только от SparkException
и TreeNodeException
, но самая ценная SemanticException
трассировка отсутствует даже после получения трассировок вложенного стека в методе getNestedStackTrace()
.
Могут ли специалисты по Spark / Scala сказать мне, что я делаю не так или как мне получить здесь полную трассировку стека со всеми исключениями?