Несмотря на то, что трассировка стека действительно происходит из вызова show()
, ключ остро стоит ...
...
HERE --> at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
...
at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
...
Вы все еще находитесь на этапе оптимизации запроса, который выполняется Catalyst в драйвере.
Причиной этого является плохо документированная особенность Spark, а именно то, что наборы данных, созданные из локальных коллекций с использованием SparkSession.createDataFrame()
(SparkSession.createDatset()
/ Seq.toDF()
в Scala), являются просто локальными отношениями внутри драйвера и в действительности не распределено:
scala> val df = (0 to 5).toDF
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.queryExecution.analyzed
res45: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107]
scala> df.isLocal
res46: Boolean = true
в отличие от наборов данных, созданных из RDD:
scala> val df_from_rdd = sc.parallelize(0 to 5).toDF
df_from_rdd: org.apache.spark.sql.DataFrame = [value: int]
scala> df_from_rdd.queryExecution.analyzed
res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#112]
+- ExternalRDD [obj#111]
scala> df_from_rdd.isLocal
res48: Boolean = false
Такие операции, как Dataset.withColumn()
, фактически выполняются самим драйвером как часть отложенной оценки оптимизированного Запланируйте запрос и никогда не переходите к этапу выполнения:
scala> val df_foo = df.withColumn("foo", functions.callUDF("myUdf", $"value"))
df_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]
scala> df_foo.queryExecution.analyzed
res49: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#107, UDF:myUdf(cast(value#107 as string)) AS foo#146]
+- LocalRelation [value#107]
scala> df_foo.queryExecution.optimizedPlan
java.lang.Exception
at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(<console>:25)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
...
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1358)
...
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at $line143.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:27)
...
res50: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#132]
// Notice: the projection is gone, merged into the local relation
scala> df_foo.queryExecution.optimizedPlan
res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#163]
// Notice: no stack trace this time
в отличие от работы с набором данных, созданным из RDD:
scala> val df_from_rdd_foo = df_from_rdd.withColumn("foo", functions.callUDF("myUdf", $"value"))
df_from_rdd_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]
scala> df_from_rdd_foo.queryExecution.optimizedPlan
res52: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#112, UDF:myUdf(cast(value#112 as string)) AS foo#135]
+- SerializeFromObject [input[0, int, false] AS value#112]
+- ExternalRDD [obj#111]
, который не производит трассировку стека в stderr исполнителя, то есть UDF не называется. С другой стороны:
scala> df_from_rdd_foo.show()
+-----+---------+
|value| foo|
+-----+---------+
| 0|0 changed|
| 1|1 changed|
| 2|2 changed|
| 3|3 changed|
| 4|4 changed|
| 5|5 changed|
+-----+---------+
создает следующую трассировку стека в stderr исполнителя:
java.lang.Exception
at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(<console>:25)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Spark рассматривает локальные отношения как литералы, что также можно увидеть по их виду представлен в SQL (код, адаптированный с здесь ):
scala> df.queryExecution.analyzed.collect { case r: LocalRelation => r }.head.toSQL("bar")
res55: String = VALUES (0), (1), (2), (3), (4), (5) AS bar(value)
scala> df_foo.queryExecution.optimizedPlan.collect { case r: LocalRelation => r }.head.toSQL("bar")
res56: String = VALUES (0, '0 changed'), (1, '1 changed'), (2, '2 changed'), (3, '3 changed'), (4, '4 changed'), (5, '5 changed') AS bar(value, foo)
или, альтернативно, в виде кода:
scala> df.queryExecution.analyzed.asCode
res57: String = LocalRelation(
List(value#107),
Vector([0,0], [0,1], [0,2], [0,3], [0,4], [0,5]),
false
)
scala> df_foo.queryExecution.analyzed.asCode
res58: String = Project(
List(value#107, UDF:myUdf(cast(value#107 as string)) AS foo#163),
LocalRelation(
List(value#107),
Vector([0,0], [0,1], [0,2], [0,3], [0,4], [0,5]),
false
)
)
scala> df_foo.queryExecution.optimizedPlan.asCode
res59: String = LocalRelation(
List(value#107, foo#163),
Vector([0,0 changed], [1,1 changed], [2,2 changed], [3,3 changed], [4,4 changed], [5,5 changed]),
false
)
Думайте о том, что происходит как эквивалент вашего Java компилятор заменяет код, такой как int a = 2 * 3;
на int a = 6;
фактическим вычислением, выполняемым компилятором.