Почему (в режиме «кластера») мой UDF выполняется локально (в драйвере) вместо этого на рабочих (и) - PullRequest
0 голосов
/ 06 апреля 2020

Работают два искровых рабочих, код выглядит следующим образом (JUnit:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.testng.annotations.Test;

public class UdfTest {

    @Test
    public void simpleUdf() {
        SparkConf conf = new SparkConf()
                .set("spark.driver.host", "localhost")
                .setMaster("spark://host1:7077")
                .set("spark.jars", "/home/.../myjar.jar")
                .set("spark.submit.deployMode", "cluster")
                .setAppName("RESTWS ML");

        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();

        List<Row> rows = new ArrayList<>();
        for (long i = 0; i < 10; i++) {
            rows.add(RowFactory.create("cr" + i));
        }

        Dataset<Row> textAsDataset = sparkSession.createDataFrame(rows,
            new StructType(new StructField[] { new StructField("contentRepositoryUUID", DataTypes.StringType, false, Metadata.empty()) }));

        sparkSession.udf().register("myUdf",
            (UDF1<String, String>)(col1) -> myUdf(col1), DataTypes.StringType);

        Dataset<Row> rowDataset = textAsDataset.withColumn("text", functions.callUDF("myUdf",
            textAsDataset.col("contentRepositoryUUID")
        ));
        rowDataset.show();
    }

    private String myUdf(String col1) {
        new Exception().printStackTrace();
        return col1 + " changed";
    }
}

Создан набор данных, и я ожидаю, что из рабочего java будет вызвана функция java myUdf() процессов, но вместо этого он вызывается из потока драйвера, трассировка стека происходит из строки rowDataset.show():

java.lang.Exception
    at UdfTest.myUdf(UdfTest.java:53)
    at UdfTest.lambda$simpleUdf$45ca9450$1(UdfTest.java:44)
    at org.apache.spark.sql.UDFRegistration$$anonfun$259.apply(UDFRegistration.scala:759)
    at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:108)
    at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:107)
    at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1063)
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:152)
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:92)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24$$anonfun$applyOrElse$23.apply(Optimizer.scala:1364)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24$$anonfun$applyOrElse$23.apply(Optimizer.scala:1364)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24.applyOrElse(Optimizer.scala:1364)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$24.applyOrElse(Optimizer.scala:1359)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:248)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1359)
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1358)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
    at UdfTest.simpleUdf(UdfTest.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
    at org.testng.internal.Invoker.invokeMethod(Invoker.java:571)
    at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:707)
    at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:979)
    at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
    at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
    at org.testng.TestRunner.privateRun(TestRunner.java:648)
    at org.testng.TestRunner.run(TestRunner.java:505)
    at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
    at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
    at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
    at org.testng.SuiteRunner.run(SuiteRunner.java:364)
    at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
    at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
    at org.testng.TestNG.runSuitesSequentially(TestNG.java:1187)
    at org.testng.TestNG.runSuitesLocally(TestNG.java:1116)
    at org.testng.TestNG.runSuites(TestNG.java:1028)
    at org.testng.TestNG.run(TestNG.java:996)
    at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
    at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)

Как Spark решает, можно ли вызывать UDF из рабочих?

Странно то, что это уже работало один раз, но теперь, когда я пытался воспроизвести этот сценарий «распределенного UDF», что-то изменилось, и я не могу. Просмотр журналов Spark DEBUG мне, к сожалению, не помог.

1 Ответ

1 голос
/ 09 апреля 2020

Несмотря на то, что трассировка стека действительно происходит из вызова show(), ключ остро стоит ...

...
HERE --> at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
...
at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
...

Вы все еще находитесь на этапе оптимизации запроса, который выполняется Catalyst в драйвере.

Причиной этого является плохо документированная особенность Spark, а именно то, что наборы данных, созданные из локальных коллекций с использованием SparkSession.createDataFrame() (SparkSession.createDatset() / Seq.toDF() в Scala), являются просто локальными отношениями внутри драйвера и в действительности не распределено:

scala> val df = (0 to 5).toDF
df: org.apache.spark.sql.DataFrame = [value: int]

scala> df.queryExecution.analyzed
res45: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107]

scala> df.isLocal
res46: Boolean = true

в отличие от наборов данных, созданных из RDD:

scala> val df_from_rdd = sc.parallelize(0 to 5).toDF
df_from_rdd: org.apache.spark.sql.DataFrame = [value: int]

scala> df_from_rdd.queryExecution.analyzed
res47: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#112]
+- ExternalRDD [obj#111]

scala> df_from_rdd.isLocal
res48: Boolean = false

Такие операции, как Dataset.withColumn(), фактически выполняются самим драйвером как часть отложенной оценки оптимизированного Запланируйте запрос и никогда не переходите к этапу выполнения:

scala> val df_foo = df.withColumn("foo", functions.callUDF("myUdf", $"value"))
df_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]

scala> df_foo.queryExecution.analyzed
res49: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#107, UDF:myUdf(cast(value#107 as string)) AS foo#146]
+- LocalRelation [value#107]

scala> df_foo.queryExecution.optimizedPlan
java.lang.Exception
    at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(<console>:25)
    at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
    at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
    ...
    at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1358)
    ...
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
    at $line143.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:27)
    ...
res50: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#132]
// Notice: the projection is gone, merged into the local relation

scala> df_foo.queryExecution.optimizedPlan
res51: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
LocalRelation [value#107, foo#163]
// Notice: no stack trace this time

в отличие от работы с набором данных, созданным из RDD:

scala> val df_from_rdd_foo = df_from_rdd.withColumn("foo", functions.callUDF("myUdf", $"value"))
df_from_rdd_foo: org.apache.spark.sql.DataFrame = [value: int, foo: string]

scala> df_from_rdd_foo.queryExecution.optimizedPlan
res52: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#112, UDF:myUdf(cast(value#112 as string)) AS foo#135]
+- SerializeFromObject [input[0, int, false] AS value#112]
   +- ExternalRDD [obj#111]

, который не производит трассировку стека в stderr исполнителя, то есть UDF не называется. С другой стороны:

scala> df_from_rdd_foo.show()
+-----+---------+
|value|      foo|
+-----+---------+
|    0|0 changed|
|    1|1 changed|
|    2|2 changed|
|    3|3 changed|
|    4|4 changed|
|    5|5 changed|
+-----+---------+

создает следующую трассировку стека в stderr исполнителя:

java.lang.Exception
    at $line98.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.myUdf(<console>:25)
    at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
    at $line99.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:26)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    ...
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Spark рассматривает локальные отношения как литералы, что также можно увидеть по их виду представлен в SQL (код, адаптированный с здесь ):

scala> df.queryExecution.analyzed.collect { case r: LocalRelation => r }.head.toSQL("bar")
res55: String = VALUES (0), (1), (2), (3), (4), (5) AS bar(value)

scala> df_foo.queryExecution.optimizedPlan.collect { case r: LocalRelation => r }.head.toSQL("bar")
res56: String = VALUES (0, '0 changed'), (1, '1 changed'), (2, '2 changed'), (3, '3 changed'), (4, '4 changed'), (5, '5 changed') AS bar(value, foo)

или, альтернативно, в виде кода:

scala> df.queryExecution.analyzed.asCode
res57: String = LocalRelation(
  List(value#107),
  Vector([0,0], [0,1], [0,2], [0,3], [0,4], [0,5]),
  false
)

scala> df_foo.queryExecution.analyzed.asCode
res58: String = Project(
  List(value#107, UDF:myUdf(cast(value#107 as string)) AS foo#163),
  LocalRelation(
    List(value#107),
    Vector([0,0], [0,1], [0,2], [0,3], [0,4], [0,5]),
    false
  )
)

scala> df_foo.queryExecution.optimizedPlan.asCode
res59: String = LocalRelation(
  List(value#107, foo#163),
  Vector([0,0 changed], [1,1 changed], [2,2 changed], [3,3 changed], [4,4 changed], [5,5 changed]),
  false
)

Думайте о том, что происходит как эквивалент вашего Java компилятор заменяет код, такой как int a = 2 * 3; на int a = 6; фактическим вычислением, выполняемым компилятором.

...