Я обнаружил неожиданный дисковый ввод-вывод (всплеск DISKBUSY) после того, как все мои задачи зажигания были завершены, но контекст искры не остановился - как показано на рисунке 2 в 21:56:47
.Может ли кто-нибудь помочь объяснить это и дать предложения о том, как этого избежать или отложить?Или у контекста искры есть периодические асинхронные операции ввода-вывода, которые могут привести к всплескам?Спасибо!
Приведен пример запуска пакетного задания SparkSQL в двух случаях.В первом из них я выполняю рабочую нагрузку sql и останавливаю контекст spark сразу после завершения действия .show()
.Во втором случае я добавляю 1-минутный сон после .show()
, используя Thread.sleep(60000)
, затем прекращаю контекст искры.Результат показывает, что затраты времени на выполнение рабочей нагрузки sql в двух случаях одинаковы, но есть неожиданный всплеск DISKBUSY на диске , который выполняет локальное хранилище для произвольной записи во втором случае.См. Всплеск на рисунке в случае 2.
Вот более подробная информация.
Настройка системы
- Spark 2.3.1, Hadoop 2.9.1, Hive2.3.4 для хранения метаданных.
- Один главный и два рабочих узла (worker1 и worker2).Каждый узел имеет достаточно доступных ресурсов (32 ядра, память 750 ГБ и 8 дисков 8-T от диска 1 до диска 8).
- HDFS развернута на диске8;Диск1 используется для локального хранения записи в случайном порядке.
- Я использую Yarn в качестве управления кластером.
- Я использую инструмент системного монитора "nmon" для определения активности диска.
- В бэкэнде не работает ни одно другое большое приложение.
- Я использую режим
yarn client
при отправке своего кода.Я использую 8 исполнителей, каждое из которых имеет 4 ядра и 8 ГБ памяти. - Отметим, что я поместил локальный файл HDFS и Yarn на два разных диска - каталог
yarn_local
находится на диске каждого рабочего1, а HDFS развернута на диске 8из двух рабочих узлов.Каждый диск имеет 8T
.Таким образом, можно различить действия для HDFS и локального диска.
Вот мой текущий анализ
- Он не вызван самим диском и другими фоновыми процессами.Я попытался использовать disk2, disk3, disk4 и disk8 для локального хранения пряжи, чтобы проверить, связан ли спайк с программой, и он показывает одинаковые пики каждый раз, когда я выполняю дело 2.
- Пик вызван Sparkсам.Я попробовал автономный режим развертывания, и шип все еще существует (без пряжи).
- Это может иметь отношение к перетасовке.Общий размер случайной записи моей целевой пакетной работы близок к
2GB
.Я также пробовал различную рабочую нагрузку с размером записи в случайном порядке, близким к 1MB
, 250MB
и 1GB
.DISKBUSY становится незначительным для пакетного задания с размером записи в случайном порядке 1MB
и становится равным 80%
для пакетного задания с общим размером записи в случайном порядке 250MB
. - Размер файла локального хранилища отслеживается.Когда появляется всплеск диска, запись на диск обнаруживается, но размер диска не увеличивается.Следовательно, (1) это может не иметь отношения к очистке дискового кэша (2) это может быть некоторая замена диска (не слишком уверен).
Согласно моему анализу в настоящее время, я подозреваю, что это должно бытьвызвано чем-то, что мне не знакомо - например, какое-то искро-асинхронное поведение на дисках.Может ли кто-нибудь помочь объяснить это?Спасибо!
Вот первый случай.
Вот второй случай.
Для большей наглядности на рисунке worker1 node local
обозначает диск 1 в worker1, the worker2 local
обозначает диск 1 в worker2;worker1 node dfs
обозначает диск 8 в worker1, а worker2 node dfs
обозначает диск 8 в worker2, где находится HDFS.Левая ось Y - это дисковая занятость (от 0% до 100%), обнаруженная nmon
, а правая ось Y - это размер каталога для hdfs на диске 8 (который мы можем просто проигнорировать для этой проблемы).
Вот мой код.
import org.apache.spark.sql.SparkSession
object Q16 {
def main(args: Array[String]): Unit = {
val db = s"bigbench_sf_100"
val spark = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
spark.sql(s"use $db")
val t1 = System.currentTimeMillis()
spark.sql(
s"""
|SELECT w_state, i_item_id,
| SUM(
| CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') < unix_timestamp('2001-03-16','yyyy-MM-dd'))
| THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
| ELSE 0.0 END
| ) AS sales_before,
| SUM(
| CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') >= unix_timestamp('2001-03-16','yyyy-MM-dd'))
| THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
| ELSE 0.0 END
| ) AS sales_after
|FROM (
| SELECT *
| FROM web_sales ws
| LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number
| AND ws.ws_item_sk = wr.wr_item_sk)
|) a1
|JOIN item i ON a1.ws_item_sk = i.i_item_sk
|JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
|JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
|AND unix_timestamp(d.d_date, 'yyyy-MM-dd') >= unix_timestamp('2001-03-16', 'yyyy-MM-dd') - 30*24*60*60 --subtract 30 days in seconds
|AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('2001-03-16', 'yyyy-MM-dd') + 30*24*60*60 --add 30 days in seconds
|GROUP BY w_state,i_item_id
|--original was ORDER BY w_state,i_item_id , but CLUSTER BY is hives cluster scale counter part
|ORDER BY w_state,i_item_id
|LIMIT 100
""".stripMargin).show
val t2 = System.currentTimeMillis()
// For case 2
// Thread.sleep(60 * 1000)
spark.stop()
}
}