Почему происходит скачок занятости диска между завершением работы и выключением Spark? - PullRequest
3 голосов
/ 18 марта 2019

Я обнаружил неожиданный дисковый ввод-вывод (всплеск DISKBUSY) после того, как все мои задачи зажигания были завершены, но контекст искры не остановился - как показано на рисунке 2 в 21:56:47.Может ли кто-нибудь помочь объяснить это и дать предложения о том, как этого избежать или отложить?Или у контекста искры есть периодические асинхронные операции ввода-вывода, которые могут привести к всплескам?Спасибо!

Приведен пример запуска пакетного задания SparkSQL в двух случаях.В первом из них я выполняю рабочую нагрузку sql и останавливаю контекст spark сразу после завершения действия .show().Во втором случае я добавляю 1-минутный сон после .show(), используя Thread.sleep(60000), затем прекращаю контекст искры.Результат показывает, что затраты времени на выполнение рабочей нагрузки sql в двух случаях одинаковы, но есть неожиданный всплеск DISKBUSY на диске , который выполняет локальное хранилище для произвольной записи во втором случае.См. Всплеск на рисунке в случае 2.

Вот более подробная информация.

Настройка системы

  • Spark 2.3.1, Hadoop 2.9.1, Hive2.3.4 для хранения метаданных.
  • Один главный и два рабочих узла (worker1 и worker2).Каждый узел имеет достаточно доступных ресурсов (32 ядра, память 750 ГБ и 8 дисков 8-T от диска 1 до диска 8).
  • HDFS развернута на диске8;Диск1 используется для локального хранения записи в случайном порядке.
  • Я использую Yarn в качестве управления кластером.
  • Я использую инструмент системного монитора "nmon" для определения активности диска.
  • В бэкэнде не работает ни одно другое большое приложение.
  • Я использую режим yarn client при отправке своего кода.Я использую 8 исполнителей, каждое из которых имеет 4 ядра и 8 ГБ памяти.
  • Отметим, что я поместил локальный файл HDFS и Yarn на два разных диска - каталог yarn_local находится на диске каждого рабочего1, а HDFS развернута на диске 8из двух рабочих узлов.Каждый диск имеет 8T.Таким образом, можно различить действия для HDFS и локального диска.

Вот мой текущий анализ

  1. Он не вызван самим диском и другими фоновыми процессами.Я попытался использовать disk2, disk3, disk4 и disk8 для локального хранения пряжи, чтобы проверить, связан ли спайк с программой, и он показывает одинаковые пики каждый раз, когда я выполняю дело 2.
  2. Пик вызван Sparkсам.Я попробовал автономный режим развертывания, и шип все еще существует (без пряжи).
  3. Это может иметь отношение к перетасовке.Общий размер случайной записи моей целевой пакетной работы близок к 2GB.Я также пробовал различную рабочую нагрузку с размером записи в случайном порядке, близким к 1MB, 250MB и 1GB.DISKBUSY становится незначительным для пакетного задания с размером записи в случайном порядке 1MB и становится равным 80% для пакетного задания с общим размером записи в случайном порядке 250MB.
  4. Размер файла локального хранилища отслеживается.Когда появляется всплеск диска, запись на диск обнаруживается, но размер диска не увеличивается.Следовательно, (1) это может не иметь отношения к очистке дискового кэша (2) это может быть некоторая замена диска (не слишком уверен).

Согласно моему анализу в настоящее время, я подозреваю, что это должно бытьвызвано чем-то, что мне не знакомо - например, какое-то искро-асинхронное поведение на дисках.Может ли кто-нибудь помочь объяснить это?Спасибо!

Вот первый случай.Case 1: without sleep

Вот второй случай.Case 2: with 60 seconds sleep

Для большей наглядности на рисунке worker1 node local обозначает диск 1 в worker1, the worker2 local обозначает диск 1 в worker2;worker1 node dfs обозначает диск 8 в worker1, а worker2 node dfs обозначает диск 8 в worker2, где находится HDFS.Левая ось Y - это дисковая занятость (от 0% до 100%), обнаруженная nmon, а правая ось Y - это размер каталога для hdfs на диске 8 (который мы можем просто проигнорировать для этой проблемы).

Вот мой код.

import org.apache.spark.sql.SparkSession
object Q16 {
  def main(args: Array[String]): Unit = {
    val db = s"bigbench_sf_100"

    val spark = SparkSession
      .builder()
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    spark.sql(s"use $db")

    val t1 = System.currentTimeMillis()
    spark.sql(
      s"""
         |SELECT w_state, i_item_id,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') < unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_before,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') >= unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_after
         |FROM (
         |  SELECT *
         |  FROM web_sales ws
         |  LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number
         |    AND ws.ws_item_sk = wr.wr_item_sk)
         |) a1
         |JOIN item i ON a1.ws_item_sk = i.i_item_sk
         |JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
         |JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') >= unix_timestamp('2001-03-16', 'yyyy-MM-dd') - 30*24*60*60 --subtract 30 days in seconds
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('2001-03-16', 'yyyy-MM-dd') + 30*24*60*60 --add 30 days in seconds
         |GROUP BY w_state,i_item_id
         |--original was ORDER BY w_state,i_item_id , but CLUSTER BY is hives cluster scale counter part
         |ORDER BY w_state,i_item_id
         |LIMIT 100
       """.stripMargin).show
    val t2 = System.currentTimeMillis()

//    For case 2
//    Thread.sleep(60 * 1000)

    spark.stop()
  }
}

1 Ответ

1 голос
/ 28 марта 2019

Я выясняю причину неожиданной активности ввода-вывода.

Это поведение кэша буфера файловой системы. Как правило, когда процесс записывает данные в файл, данные не записываются на диск сразу, а вместо этого записываются в кэш-память. Этот кэш поддерживается операционной системой / файловой системой как оптимизация производительности, поскольку он позволяет возвращать запросы записи после записи в память и не ждать завершения медленных операций ввода-вывода. Эти грязные данные периодически сбрасываются на диск в фоновом режиме операционной системой.

Таким образом, в основном действия на диске (сброс) невозможно избежать, если только страницы файлов не удаляются при кэшировании в буфере диска (в случае 1).

Вы можете немедленно принудительно записать все грязные данные, используя системную команду Linux sync.

...