СДР в кеше искры рассчитывается n раз - PullRequest
0 голосов
/ 14 февраля 2019

У меня проблема с приложением Spark.Вот упрощенная версия моего кода:

def main(args: Array[String]) {
    // Initializing spark context
    val sc = new SparkContext()
    val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
    System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)

    // Getting files from TGZ archives
    val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
    val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
        logger.debug("Getting files from archive : "+tgzStream._1)
        utils.getFilesFromTgzStream(tgzStream._2)
    })

    // We run the same process with 3 different "modes"
    val modes = Seq("mode1", "mode2", "mode3")

    // We cache the RDD before
    val nb = filesRDD.cache().count()
    logger.debug($nb + " files as input")

    modes.map(mode => {
        logger.debug("Processing files with mode : " + mode)
        myProcessor.process(mode, filesRDD)
    })

    filesRDD.unpersist() // I tried with or without this

    [...]
}

Сгенерированные журналы (например, с 3-мя архивами в качестве входных данных):

Получение файлов из архива: a

Получение файлов из архива: b

Получение файлов из архива: c

3 файла на входе

Обработка файлов в режиме: mode1

Получениефайлы из архива: a

Получение файлов из архива: b

Получение файлов из архива: c

Обработка файлов в режиме: mode2

Получение файлов изархив: a

Получение файлов из архива: b

Получение файлов из архива: c

Обработка файлов в режиме: mode3

Получение файлов из архива:a

Получение файлов из архива: b

Получение файлов из архива: c

Конфигурация My Spark:

  • Версия: 1.6.2
  • Исполнители: 20 x 2CPU x 8Go RAM
  • Объем дополнительной памяти на одного исполнителя: 800Mo
  • Драйвер: 1CPU x 8Go RAM

Из этих журналов я понимаю, что извлечение файлов выполняется 4 раза в экземпляре одного!Это, очевидно, приводит меня к проблемам с пространством кучи и к потере производительности ...

Я что-то не так делаю?

РЕДАКТИРОВАТЬ: Я также пытался использовать modes.foreach(...) вместокарта, но ничего не изменилось ...

Ответы [ 2 ]

0 голосов
/ 28 февраля 2019

Хорошо, после МНОГО тестов я наконец-то решил эту проблему.На самом деле было 2 проблемы:

  1. Я недооценил размер входных данных: Функции Спарка cache или persist неэффективны, если СДР слишком великчтобы хранить полностью в 60% общей памяти, я знал это, но думал, что мои входные данные были не такими большими, но на самом деле мой RDD составлял 80 ГБ.Но 60% моей памяти (которая составляет 160 ГБ) по-прежнему превышает 80 ГБ, так что же случилось?Ответ на вопрос № 2 ...

  2. Мои разделы были слишком большими: Где-то в моем коде число разделов моего RDD было установлено равным 100,поэтому у меня было 100 разделов по 1,6 ГБ каждая.Проблема состоит в том, что мои данные состоят из строк по десятки мегабайт каждый, поэтому мои разделы не были заполнены, а 10 ГБ используемой памяти фактически содержали только 7 или 8 ГБ реальных данных.

Чтобы решить эти проблемы, мне пришлось использовать persist(StorageLevel.MEMORY_SER), который увеличивает время вычислений, но значительно сокращает использование памяти ( в соответствии с этим тестом ) И устанавливает номер раздела в 1000 (согласно документации Spark, которая рекомендует разделы~ 128 МБ)

0 голосов
/ 19 февраля 2019

Вы пытались передать результат modes.map в конструктор List (т.е. List(modes.map{ /*...*/}))?Иногда (я не уверен, когда) коллекции Scala отображают ленивые сопоставления, поэтому, если они не оцениваются до тех пор, пока искра не удалит кэши, придется пересчитать.

...