Scala - Получить DAG с этапами и заданиями без выполнения - PullRequest
1 голос
/ 03 апреля 2020

Я ищу способ получения DAG приложения Scala Spark с использованием RDD, включая этапы и задачи.

Я пробовал rdd.toDebugString, но он показывает только происхождение RDD, а не DAG я ищу.

Я знаю, что есть веб-интерфейс, который отображает DAG, но я хочу извлечь DAG из кода, аналогично тому, как это делает функция explain для фрейма данных. .

Ответы [ 2 ]

1 голос
/ 04 апреля 2020

Следующие пункты:

  • rdd.toDebugString предназначены только для СДР до исполнения.

  • Выполнение DAG это то, что вы можете наблюдать во время выполнения для RDD и Dataframes через Spark Web UI . См. Новый выпуск: https://spark.apache.org/docs/3.0.0-preview/web-ui.html

  • До выполнения вы можете запустить .explain для Dataframes.

  • Из хорошего источника:

Spark SQL Оператор EXPLAIN предоставляет подробную информацию о плане для оператора sql без его фактического выполнения. Вы можете использовать оператор Spark SQL EXPLAIN, чтобы отобразить фактический план выполнения, который механизм исполнения Spark сгенерирует и использует при выполнении любого запроса. Вы можете использовать этот план выполнения для оптимизации ваших запросов.

Простой пример для Dataframe:

import org.apache.spark.sql.Row     
val dfsFilename = "/FileStore/tables/sample_text.txt"
val readFileDF = spark.sparkContext.textFile(dfsFilename)  
val wordsDF = readFileDF.flatMap(_.split(" ")).toDF
val wcounts3 = wordsDF.filter(r => (r(0) != "Humpty") || (r(0) != "Dumpty"))
                      .groupBy("Value") // Note the value
                      .count().explain()

Вы помечаете оператор соответствующим образом, но не в show (), для Dataframe / Dataset.

== Physical Plan ==
*(2) HashAggregate(keys=[Value#4], functions=[finalmerge_count(merge count#14L) AS count(1)#8L])
+- Exchange hashpartitioning(Value#4, 200), [id=#61]
   +- *(1) HashAggregate(keys=[Value#4], functions=[partial_count(1) AS count#14L])
      +- *(1) Filter <function1>.apply
         +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#4]
            +- Scan[obj#3]

Ваш конкретный c вопрос: не представляется возможным и, возможно, не совсем допустимым, так как существует небольшая оптимизация для рассмотрения.

0 голосов
/ 05 апреля 2020

В дополнение к функции explain вы также можете просматривать веб-интерфейс при выполнении приложения - если вы работаете локально, он должен быть на http://localhost:4040/ (как описано в документации здесь: https://spark.apache.org/docs/latest/monitoring.html). Он предоставляет список заданий, визуализацию DAG для каждого задания, конфигурации и т. Д. c.

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...