Как разные блоки файла обрабатываются параллельно на отдельных узлах? - PullRequest
2 голосов
/ 09 июня 2019

Рассмотрим приведенный ниже пример программы для справки

val text = sc.textFile("file_from_local_system.txt");// or  file can also be on hdfs
val counts = text.flatMap(line => line.split(" ")
 ).map(word => (word,1)).reduceByKey(_+_) counts.collect

Мое понимание: -

  1. Драйверная программа создает график происхождения (LG) / рассчитываетзадание, этапы и задачи.
  2. Затем попросите менеджер кластера (скажем, отдельный автономный менеджер кластера) выделить ресурс на основе задач.

Надеюсь, это правильно?

Вопрос: -

Мой вопрос на шаге_1.Чтобы рассчитать количество задач, которые могут выполняться параллельно, программа-драйвер (DP) также должна знать количество блоков, хранящихся на диске для этого файла.

Знает ли DP об этом при создании LG, и затем задачи внутри содержат адрес каждого блока, чтобы каждый из них мог выполняться параллельно на отдельном узле?

Ответы [ 3 ]

0 голосов
/ 10 июня 2019

Довольно интересный и не очень тривиальный вопрос!После более глубокого погружения в основной источник Spark (2,4x), вот мое понимание и ответ на ваш вопрос:

  1. Общие знания:

    • основной точкой входа для всех действий Spark является SparkContext.
    • Планировщик Dag создается изнутри SparkContext.
    • SparkContext имеет метод runJob, который сам сообщает планировщику Dag вызвать его метод runJob.Он вызывается для данного СДР и соответствующих ему разделов.
    • Планировщик Dag строит график выполнения на основе этапов, представленных в виде TaskSets.
    • Подсказка: Планировщик Dag может получать информацию о расположенииblockIds путем связи с BlockManagerMaster.
    • Планировщик Dag также использует низкоуровневый TaskScheduler, который содержит отображение между идентификатором задачи и идентификатором исполнителя.
    • Отправка задач в TaskScheduler соответствует построениюTaskSets для этапа, затем вызывающего TaskSetManager.
    • Интересно знать: Зависимости заданий управляются планировщиком DAG, локальность данных управляется TaskScheduler.
    • Задачи - это отдельные единицы работы,каждый отправляется на одну машину (исполнитель).
  2. Давайте посмотрим на Task.run ()

    • Он регистрирует задачу в BlockManager: SparkEnv.get.blockManager.registerTask(taskAttemptId)
    • Затем он создает TaskContextImpl () в качестве контекста и вызывает runTask (context)
    • Класс ResultTask и класс ShuffleMapTask оба переопределяют этот runTask ()
    • У нас есть один ResultTask на раздел
    • Наконец, данные десериализуются в rdd.
  3. С другой стороны, у нас есть семейство менеджеров блоков:

    • Каждый исполнитель, включая драйвер, имеет BlockManager.
    • BlockManagerMaster запускается на драйвере.
    • BlockManagerMasterEndpoint - это конечная точка rpc, доступная через BlockManagerMaster.
    • BlockManagerMaster доступен через службу SparkEnv.
    • Когда исполнителю предлагается запустить launchTask (), он создает TaskRunner и добавляет его во внутренний набор runningTasks.
    • TaskRunner.run () вызывает task.run ()
  4. Итак, что происходит при запуске задачи?

    • blockId извлекается из taskId
    • результаты сохраняются вBlockManager использует: env.blockManager.putBytes(blockId, <the_data_buffer_here>, <storage_level_here>, tellMaster=true)
    • Метод putBytes сам вызывает: doPut(blockId, level, classTag, tellMaster, keepReadLock), который сам решает сохранить в памяти или на диске, в зависимости от уровня хранения.
    • Наконец,удалить идентификатор задачи из списка текущих задач.
  5. Теперь вернемся к вашему вопросу:

    • при вызове API-интерфейса разработчика как: sc.textFile(<my_file>), вы можете указать второй параметр для установкиколичество разделов для вашего rdd (или полагаться на параллелизм по умолчанию).
    • Например: rdd = sc.textFile("file_from_local_system.txt", 10)
    • Добавить несколько шагов карты / фильтра, например.
    • Контекст Sparkимеет свою структуру Дага.При вызове действия - например, rdd.count () - некоторые этапы, содержащие наборы задач, передаются исполнителям.
    • TaskScheduler обрабатывает локальность данных блоков.
    • Если исполнитель, выполняющий задачу, имеет блокданные локально, он будет использовать их, в противном случае получит их для удаленного доступа.
    • У каждого исполнителя есть свой BlockManager.BlockManager также является BlockDataManager, который имеет атрибут RDDBlockId.RDDBlockId описывается идентификатором RDD (rddId) и индексом раздела (splitIndex).RDDBlockId создается, когда RDD запрашивается для получения или вычисления раздела RDD (идентифицируемого splitIndex).

Надеюсь, это поможет!Пожалуйста, исправьте меня, если я ошибаюсь / приблизительный по любому из этих пунктов.

Удачи!

Ссылки:

0 голосов
/ 12 июня 2019

Мой вопрос на шаге_1. Чтобы рассчитать количество задач, которые могут выполняться параллельно, программа-драйвер (DP) также должна знать количество блоков, хранящихся на диске для этого файла.

Знает ли DP об этом при создании LG, и затем задачи внутри содержат адрес каждого блока, чтобы каждый мог выполняться параллельно на отдельном узле?

Да, это называется "разбиение". Есть вызов Hadoop Filesystem API getBlockLocations, который показывает, как файл разбивается на блоки, и имена хостов, на которых хранятся копии. Каждый формат файла также объявляет, является ли формат файла «разделяемым» на основе формата (text, CSV, PArquet, ORC == yes) и может ли сжатие также разделяться (snappy yes, gzip no)

Затем драйвер Spark делит обработку по файлам и по количеству разделений, которые он может выполнить для каждого файла, а затем планирует работу над доступными рабочими процессами, «близкими» к данным.

Для HDFS разбиение / расположение блоков определяется при записи файлов: они записываются в виде блоков (конфигурируются) и распределяются по кластеру.

Для хранилищ объектов нет реального разделения или местоположения; каждый клиент имеет некоторую опцию конфигурации для управления размером блока, который он объявляет (например, fs.s3a.blocksize), и просто говорит «localhost» для местоположения. Spark знает, что когда он видит localhost, он означает «где угодно»

0 голосов
/ 10 июня 2019

Этот вопрос на самом деле сложнее, чем можно подозревать.

  • Это мое понимание случая HDFS, на который вы ссылаетесь, когда узел данных является рабочим узлом. Итак, я исключаю из этого обсуждения S3 и хранилище BLOB-объектов AZURE, 2-го поколения и т. Д., То есть в этом объяснении предполагается принцип Data Locality, который с облачными вычислениями устареет, если только высокая производительность не уйдет.

  • Ответ также исключает аспекты перераспределения и сокращения, которые также влияют на вещи, а также на динамическое распределение ресурсов YARN, поэтому он принимает YARN в качестве диспетчера кластеров.

Вот так:

Распределение ресурсов

  • Они распределяются заранее драйвером, запрашивающим их у YARN, то есть до физического создания группы DAG, основанной на этапах, содержащих задачи. Подумайте о параметрах на spark-submit, например.
  • Следовательно, ваш 2-й пункт не совсем верен.
  • В зависимости от режима обработки, допустим, в кластерном режиме YARN вы получите полное распределение ресурсов.
    • например. если у вас есть кластер, скажем, 5 узлов данных / рабочих, с 20 процессорами (40 ядер), то, если вы просто отправите и используете значения по умолчанию, вы, скорее всего, получите приложение Spark (для N действий), которое имеет 5 x 1 ядро ​​в всего выделено 1 для каждого узла данных / рабочих.
  • Полученные ресурсы обычно полностью удерживаются за каждую работу Spark.
  • Spark Job - это действие, которое является частью Spark App. Приложение Spark может иметь N действий, которые обычно запускаются последовательно.
  • Обратите внимание, что задание может продолжаться, если все ресурсы не могут быть выделены.

(Водитель) Исполнение

  • Предположим, ваш файл может иметь 11 разделов, 2 раздела для 4 узлов и 1 раздел для 5-го узла данных / рабочих, скажем.
  • Затем в терминах Spark файл, который вы указываете с помощью sc.textfile, обрабатывается с использованием двоичных файлов Hadoop, которые работают на основе задач для каждого блока файла, что означает, что драйвер сгенерирует задачи - всего 11 для Начальная ступень. Первая стадия заключается в том, что перед Shuffling требуется уменьшить.
    • Драйвер, таким образом, получает информацию и выдает множество заданий для каждой стадии, которые (конвейерно) и устанавливаются для выполнения последовательно этим ядром = Исполнителем для этого рабочего узла.
    • У каждого рабочего / узла данных может быть больше исполнителей, что будет означать более быстрое выполнение и, следовательно, пропускную способность.
  • Это показывает, что мы можем тратить ресурсы. Распределение по умолчанию в 1 ядро ​​на узел Data / Worker может быть расточительным для файлов меньшего размера или, как результат, искаженных данных после перераспределения. Но это для дальнейшего рассмотрения.

Другие соображения

  • Можно ограничить количество исполнителей на приложение и, следовательно, работу. Если вы выберете достаточно низкое число, т. Е. Меньше, чем количество узлов в вашем кластере, и файл будет распределен по всем узлам, вам потребуется перенести данные с рабочего узла / узла данных на другой такой узел. Это не Shuffle, кстати.
  • S3 - хранилище AWS, и данные отделены от рабочего узла. Это связано с Compute Elasticity .
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...