Следующие пункты:
rdd.toDebugString
предназначены только для СДР до исполнения.
Выполнение DAG это то, что вы можете наблюдать во время выполнения для RDD и Dataframes через Spark Web UI . См. Новый выпуск: https://spark.apache.org/docs/3.0.0-preview/web-ui.html
До выполнения вы можете запустить .explain
для Dataframes.
- Из хорошего источника:
Spark SQL Оператор EXPLAIN предоставляет подробную информацию о плане для оператора sql без его фактического выполнения. Вы можете использовать оператор Spark SQL EXPLAIN, чтобы отобразить фактический план выполнения, который механизм исполнения Spark сгенерирует и использует при выполнении любого запроса. Вы можете использовать этот план выполнения для оптимизации ваших запросов.
Простой пример для Dataframe:
import org.apache.spark.sql.Row
val dfsFilename = "/FileStore/tables/sample_text.txt"
val readFileDF = spark.sparkContext.textFile(dfsFilename)
val wordsDF = readFileDF.flatMap(_.split(" ")).toDF
val wcounts3 = wordsDF.filter(r => (r(0) != "Humpty") || (r(0) != "Dumpty"))
.groupBy("Value") // Note the value
.count().explain()
Вы помечаете оператор соответствующим образом, но не в show (), для Dataframe / Dataset.
== Physical Plan ==
*(2) HashAggregate(keys=[Value#4], functions=[finalmerge_count(merge count#14L) AS count(1)#8L])
+- Exchange hashpartitioning(Value#4, 200), [id=#61]
+- *(1) HashAggregate(keys=[Value#4], functions=[partial_count(1) AS count#14L])
+- *(1) Filter <function1>.apply
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#4]
+- Scan[obj#3]
Ваш конкретный c вопрос: не представляется возможным и, возможно, не совсем допустимым, так как существует небольшая оптимизация для рассмотрения.