Как применить модель большого питона к pyspark-dataframe? - PullRequest
6 голосов
/ 15 мая 2019

У меня есть:

  • Большой кадр данных (формат паркета, 100 000 000 строк, размер 4,5 ТБ), содержащий некоторые данные (функции)
  • Несколько огромных моделей ML (каждаязанимает 5-15 ГБ ОЗУ)
  • Кластер Spark (AWS EMR), типовая конфигурация узла - 8 ЦП, 32 ОЗУ, при необходимости можно изменить.

Я хочу применить ихиспользуя PySpark, но я всегда получаю некоторые проводные ошибки, такие как:

  • OOM
  • Случайные таймауты (узлы не возвращают никакого результата) -> узел убит менеджером YARN

Я обычно использовал код, такой как

def apply_model(partition):
    model = load(...)  # load model only when apply this function to avoid serialization issue
    for row in partition:
        yield model.infer(row)

или

def apply_model(partition):
    model = load(...)  # load model only when apply this function to 
    yield from model.infer(partition)

, и примените его, используя

df.select(...).rdd.mapPartitions(apply_model)

Я могуМодель 't broadcast, по причинам сериализации.

Вопрос - как применить модель на основе большого python / any-non-jvm для разжигания фрейма данных и избежания искровых исключений?

Ответы [ 4 ]

3 голосов
/ 23 мая 2019

Вот некоторые дополнительные предложения, которые могут помочь повысить производительность вашей работы:

  • Первое изменение, которое я хотел бы сделать, - это уменьшить размер раздела.Если я правильно понял на данный момент у вас есть входные данные 4,5 ТБ.Это означает, что если у вас есть 1000 разделов, то вы отправите 4,5 ГБ на раздел каждому исполнителю!Этот размер считается вполне большим , вместо этого я бы попытался сохранить размер раздела между 250-500 МБ.Примерно в вашем случае это означало бы ~ 10000 (4,5 ТБ / 500 МБ) разделов.

  • Увеличьте параллелизм, добавив больше исполнителей.Это увеличит уровень данных locality и, следовательно, сократит время выполнения.В идеале у вас должно быть 5 баллов для каждого исполнителя и два исполнителя (если возможно) для каждого узла кластера.Максимальное количество ядер на исполнителя не должно превышать 5, поскольку это может привести к узким местам ввода / вывода (когда / если используется дисковое хранилище).

  • Что касается памяти, предложения от @rluta, я думаю, более чем достаточны.В целом, слишком большие значения для памяти исполнителя могут оказать негативное влияние на время Java GC, поэтому верхний предел 10 ГБ должен быть идеальным значением для spark.executor.memory.

2 голосов
/ 20 мая 2019

Когда вы применяете функцию python к разделу с mapPartitions, Spark оценивает его в сопутствующем процессе python для каждого исполнителя на основе JVM.

Обычно память, используемая процессом python, мала ихорошо в пределах настройки YARN memoryOverhead, используемой в EMR.В вашем конкретном случае это предположение не выполняется, поскольку процесс python должен удерживать вашу большую модель в памяти, поэтому вам необходимо адаптировать свою конфигурацию.

Если вы используете 8 ЦП / 32 ГБ ОЗУ для каждого хоста исполнителя, выв качестве базовой конфигурации можно попробовать следующее:

spark.executor.cores=6
spark.executor.memory=8G
spark.executor.pyspark.memory=20G

Обратите внимание, что установка spark.executor.pyspark.memory наложит жесткое ограничение на использование памяти вашим процессом python (по умолчанию оно не ограничено), поэтому вам, возможно, придется поэкспериментировать снайдите ограничение, которое будет соответствовать вашему процессу.

Альтернативная конфигурация - сохранить память процесса pyspark неограниченной, но зарезервировать достаточно YARN memoryOverhead для его размещения, например:

spark.executor.cores=6
spark.executor.memory=8G
spark.executor.memoryOverhead=22G
0 голосов
/ 21 мая 2019

Если я правильно понял ваш вопрос, значит, у вас уже есть модель, и вам нужно применить ее к 4,5 ТБ данных. Если это так, то вы можете обрабатывать его как поток. Вы делите файл на управляемый размер и затем предоставляете каталог для чтения и обработки в Spark Stream.

5/23: Согласно моему пониманию в mapPartition, вы полностью читаете данные, а затем разделяете их, и поэтому существует вероятность переполнения памяти. Где, как в потоке, вы должны создать мини-пакет входных данных, а затем обрабатывать этот мини-пакет за один раз. Справка: https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#basic-concepts

Если вы можете разбить файл больших данных на более мелкий, то, как указано в приведенной ниже ссылке, вы сможете обрабатывать их по одному или кучу за раз, пока большие данные не будут полностью использованы. ссылка: https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#input-sources

0 голосов
/ 16 мая 2019

Вписываются ли ваши разделы в память одного исполнителя? Вы можете попытаться увеличить количество разделов и посмотреть, улучшится ли ситуация:

df.select(...).repartition(1000).rdd.mapPartitions(apply_model)

Подтвердите, что это улучшение, изучив показатели Spark UI, такие как:

  • Размер ввода / записи
  • Размер записи в случайном порядке / записи
  • Время GC

Сравните значения Median, 75th percentile и Max, чтобы определить, не перекошены ли ваши данные.

...