Каков наилучший способ сбора статистики запуска заданий Spark и сохранения в базе данных - PullRequest
0 голосов
/ 26 апреля 2018

В моей программе Spark есть несколько объединений таблиц (с использованием SPARKSQL), и я хотел бы собрать время, затраченное на обработку каждого из этих объединений, и сохранить его в статистической таблице. Цель состоит в том, чтобы запустить его непрерывно в течение определенного периода времени и повысить производительность на очень детальном уровне.

например,

val DF1 = spark.sql («выберите x, y из A, B»)

Val DF2 = spark.sql («выберите k, v из TABLE1, TABLE2»)

наконец, я присоединяюсь к DF1 и DF2, а затем инициирую действие, подобное saveAsTable.

Что мне нужно, так это выяснить

1. Сколько времени действительно потребовалось для вычисления DF1

2. Сколько времени нужно для вычисления DF2 и

3.Как много времени, чтобы сохранить эти последние присоединения к Hive / HDFS

и поместите всю эту информацию в таблицу / файл RUN-STATISTICS.

Любая помощь приветствуется и спасибо заранее

1 Ответ

0 голосов
/ 26 апреля 2018

Spark использует Lazy Evaluation, что позволяет движку оптимизировать преобразования СДР на очень детальном уровне.

Когда вы выполните

val DF1= spark.sql("select x,y from A,B ")

ничего не происходит, за исключением того, что преобразование добавлено в направленный ациклический граф.

Только при выполнении действия, такого как DF1.count, драйвер вынужден выполнять физический план выполнения. Это откладывается как можно дальше вниз по цепочке преобразований СДР.

Поэтому не правильно спрашивать

1. Сколько времени действительно потребовалось для вычисления DF1

2. Сколько времени нужно для вычисления DF2 и

хотя бы на основе предоставленных вами примеров кода. Ваш код не "вычислил" val DF1. Мы можем не знать, сколько времени заняла обработка только DF1, если вы каким-то образом не обманули компилятор в обработке каждого кадра данных отдельно.

Лучшим способом структурирования вопроса может быть «сколько этапов (задач) делится на мою работу в целом и сколько времени требуется, чтобы завершить эти этапы (задачи)»?

И на это можно легко ответить, посмотрев временную шкалу лог-файлов / веб-интерфейса (в зависимости от вашей настройки).

3.Как много времени, чтобы сохранить эти последние присоединения к Hive / HDFS

Справедливый вопрос. Проверьте Ganglia

Инструменты мониторинга на уровне кластера, такие как Ganglia, могут дать представление об общем использовании кластера и узких местах ресурсов. Например, панель инструментов Ganglia может быстро определить, связана ли конкретная рабочая нагрузка с диском, сетью или ЦП.

Еще один прием, который мне нравится использовать: он определяет каждую последовательность преобразований, которые должны заканчиваться действием внутри отдельной функции, а затем вызывает эту функцию на входном СДР внутри блока «функция таймера».

Например, мой «таймер» определен как таковой

def time[R](block: => R): R = {
  val t0 = System.nanoTime()
  val result = block  
  val t1 = System.nanoTime()
  println("Elapsed time: " + (t1 - t0)/1e9 + "s")
  result
}

и может использоваться как

val df1 = Seq((1,"a"),(2,"b")).toDF("id","letter")

scala> time{df1.count}
Elapsed time: 1.306778691s
res1: Long = 2

Однако не вызывайте ненужных действий, чтобы просто разбить DAG на несколько этапов / широких зависимостей. Это может привести к случайным изменениям или замедлить выполнение.

Ресурсы:

https://spark.apache.org/docs/latest/monitoring.html

http://ganglia.sourceforge.net/

https://www.youtube.com/watch?v=49Hr5xZyTEA

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...