Spark использует Lazy Evaluation, что позволяет движку оптимизировать преобразования СДР на очень детальном уровне.
Когда вы выполните
val DF1= spark.sql("select x,y from A,B ")
ничего не происходит, за исключением того, что преобразование добавлено в направленный ациклический граф.
Только при выполнении действия, такого как DF1.count, драйвер вынужден выполнять физический план выполнения. Это откладывается как можно дальше вниз по цепочке преобразований СДР.
Поэтому не правильно спрашивать
1. Сколько времени действительно потребовалось для вычисления DF1
2. Сколько времени нужно для вычисления DF2 и
хотя бы на основе предоставленных вами примеров кода. Ваш код не "вычислил" val DF1. Мы можем не знать, сколько времени заняла обработка только DF1, если вы каким-то образом не обманули компилятор в обработке каждого кадра данных отдельно.
Лучшим способом структурирования вопроса может быть «сколько этапов (задач) делится на мою работу в целом и сколько времени требуется, чтобы завершить эти этапы (задачи)»?
И на это можно легко ответить, посмотрев временную шкалу лог-файлов / веб-интерфейса (в зависимости от вашей настройки).
3.Как много времени, чтобы сохранить эти последние присоединения к Hive / HDFS
Справедливый вопрос. Проверьте Ganglia
Инструменты мониторинга на уровне кластера, такие как Ganglia, могут дать представление об общем использовании кластера и узких местах ресурсов. Например, панель инструментов Ganglia может быстро определить, связана ли конкретная рабочая нагрузка с диском, сетью или ЦП.
Еще один прием, который мне нравится использовать: он определяет каждую последовательность преобразований, которые должны заканчиваться действием внутри отдельной функции, а затем вызывает эту функцию на входном СДР внутри блока «функция таймера».
Например, мой «таймер» определен как таковой
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0)/1e9 + "s")
result
}
и может использоваться как
val df1 = Seq((1,"a"),(2,"b")).toDF("id","letter")
scala> time{df1.count}
Elapsed time: 1.306778691s
res1: Long = 2
Однако не вызывайте ненужных действий, чтобы просто разбить DAG на несколько этапов / широких зависимостей. Это может привести к случайным изменениям или замедлить выполнение.
Ресурсы:
https://spark.apache.org/docs/latest/monitoring.html
http://ganglia.sourceforge.net/
https://www.youtube.com/watch?v=49Hr5xZyTEA