Операция подсчета, приводящая к большему количеству pyspark rack_local - PullRequest
1 голос
/ 12 апреля 2019

Я пытаюсь понять уровень локальности кластера Spark и его связь с количеством разделов RDD вместе с действием, выполняемым на нем. В частности, у меня есть фрейм данных, в котором количество разделов равно 9647. Затем я выполнил df.count на нем и заметил следующее в пользовательском интерфейсе Spark:

enter image description here

Немного контекста. Я отправил свою работу в кластер Yarn со следующей конфигурацией:

- executor_memory='10g',
- driver_memory='10g',
- num_executors='5',
- executor_cores=5'

Также я заметил, что все исполнители были с 5 разных узлов (хостов).

Из рисунка я обнаружил, что из всех 9644 задач более 95% не выполнялись на одном узле. Итак, мне просто интересно, почему так много с rack_local. В частности, почему бы узлу не выбрать ближайший источник данных для выполнения, другими словами, иметь больше локальных узлов?

Спасибо

1 Ответ

0 голосов
/ 14 апреля 2019

Вот несколько моментов для рассмотрения.

Ниже вы можете найти некоторые факторы, влияющие на локальность данных в Spark:

  1. Изначально Spark попытается разместить задачу как можно ближе кузел, где существуют исходные данные.Например, если исходной системой является HDFS, Spark попытается выполнить задачу в том же узле, где существуют данные определенного раздела.Spark найдет предпочтительное местоположение для каждого СДР, введя getPreferredLocations.Позже TaskScheduler будет использовать эту информацию для принятия решения о местонахождении задачи.В определении RDD вы можете найти определение getPreferredLocations, которое отвечает за указание оптимального местоположения RDD.Например, если источником является HDFS, Spark создаст экземпляр HadoopRDD (или NewHadoopRDD) и получит доступ к Hadoop API для получения информации о расположении исходных файлов , переопределяющей функцию getPreferredLocations изего базовый класс.
  2. Основной причиной невозможности достижения высокой локализации, например: PROCESS_LOCAL или NODE_LOCAL, является нехватка ресурсов в целевом узле.Spark использует настройку spark.locality.wait для установки времени ожидания , в котором должно быть принято решение об уровне locality .Spark будет использовать этот параметр, чтобы подождать определенное время, пока ресурсы станут доступными.Если после истечения интервала spark.locality.wait на узле нет доступных ресурсов (ядер) , то Spark понизит уровень локальности, например: PROCESS_LOCAL -> NODE_LOCAL то же самое произойдет сновый пониженный уровень до тех пор, пока не будут выполнены требуемые спецификации ресурсов.С другой стороны, один из способов обновить состоит в том, чтобы добавить больше ресурсов, например: добавить нового исполнителя.Найденные тесты здесь (строка 915) демонстрируют этот сценарий.Значение по умолчанию - 3 с. Если вы считаете, что вам следует уделять больше времени своим задачам, вы можете увеличить это значение, хотя не рекомендуется (может неэффективно увеличить время простоя Spark).
  3. В случае, если ваши данные находятся за пределами кластера Spark, тогда уровень локальности будет установлен на ЛЮБОЙ.

Мой последний совет по улучшению локальности - информировать Spark о расположении разделов с помощьюrepartition() + persist() or cache().

Примечание: постоянство вступит в силу после первого вызова действия.

Полезные ссылки:

https://www.waitingforcode.com/apache-spark/spark-data-locality/read

http://www.russellspitzer.com/2017/09/01/Spark-Locality/

https://github.com/apache/spark/blob/0bb716bac38488bc216fbda29ce54e93751e641b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

...