Что такое ExternalRDDScan в DAG? - PullRequest
       35

Что такое ExternalRDDScan в DAG?

5 голосов
/ 01 октября 2019

Что означает ExternalRDDScan в DAG?

Весь Интернет не имеет объяснения этому.

enter image description here

Ответы [ 2 ]

5 голосов
/ 01 октября 2019

Исходя из источника , ExternalRDDScan является представлением преобразования существующей СДР произвольных объектов в набор данных InternalRow с, то есть создания DataFrame. Давайте проверим правильность нашего понимания:

scala> import spark.implicits._
import spark.implicits._

scala> val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.toDF().explain()
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
1 голос
/ 13 октября 2019

ExternalRDD - это логическое представление DataFrame / Dataset (но не во всех случаях) в плане выполнения запроса, т.е. в DAG, созданной искрой.

ExternalRDD (s) создаются

  1. при создании DataFrame из RDD (, то есть с использованием createDataFrame (), toDF () )
  2. при создании DataSet из RDD (, то есть с использованием createDataSet (), toDS () )

Во время выполнения, когда ExternalRDD должен быть загружен в память,выполняется операция сканирования, которая представлена ​​ExternalRDDScan (внутренне стратегия сканирования разрешена в ExternalRDDScanExec). Посмотрите на пример ниже:

scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> sampleRDD.toDF.queryExecution
res0: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan[obj#1]

Вы можете видеть, что в плане выполнения запроса объект DataFrame представлен ExternalRDD, а физический план содержит операцию сканирования , котораяразрешается к ExternalRDDScan (ExternalRDDScanExec) во время его выполнения.

То же самое относится и к набору искровых данных.

scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> sampleRDD.toDS.queryExecution.logical
res9: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#23]
+- ExternalRDD [obj#22]

scala> spark.createDataset(sampleRDD).queryExecution.logical
res18: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#39]
+- ExternalRDD [obj#38]

Приведенные выше примеры былизапустить в версии 2.4.2

Ссылка: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-LogicalPlan-ExternalRDD.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...