Настройка приложения AWS EMR Spark для groupBy с> 300 000 групп - PullRequest
0 голосов
/ 04 марта 2019

Шаги:

1) Считать файл с S3 (140-200 ГБ).

2) Затем я применяю функцию filter (), чтобы удалить большинство карт записей (будет удалено около 70%).

3) Для остальных записей (около 40-50 ГБитого), используйте map () для создания JavaRdd MyObj.

4) После этого я использую group by для группировки объектов по полю sesisonId.В общей сложности 40-50 ГБ я ожидаю получить около 300 000 групп в groupBy.

5) Затем для каждой группы я выполняю processSingeGroupOfEvents () (эта функция выглядит следующим образом: она принимает Interable, а затем выполняетнесколько простых сотрудников для каждого Iterable [из 300 000]).

6) После этого я использую coalesce () и saveAsTextFile для вывода результатов на S3 (размер выходного файла будет около 1-2 ГБ).

Псевдокод:

 JavaRDD<MyObj> eventsCollection = context.textFile(input)
                .filter(data -> applySomeFilter(data))      // This will filter ~70% of records 
                .map(data ->  createMyObjFromData(data));

JavaPairRDD<String, Iterable<MyObj>> eventsCollectionGroupedById = eventsCollection
                .groupBy(x -> x.getSessionId())
                .persist(StorageLevel.MEMORY_AND_DISK());;

JavaPairRDD<String, String> groupedByIdResults = eventsCollectionGroupedById
                .mapValues(iterable -> processSingeGroupOfEvents(iterable, additionalVariable1, additionalVariable2 ));

groupedByIdResults
                .coalesce(1) // Union all partitions into single one (in order to avoid multiple output files)
                .map(data ->  data._2()) 
                .saveAsTextFile(outputS3Location);

Мои текущие конфигурации для приложения AWS EMR Spark:

  • 4 типа экземпляра ядра r3.8xlarge
  • и 1 из r3.8xlarge для мастераузел
  • метка выпуска Emr: emr-5.11.0
  • maximizeResourceAllocation = true.
  • версия Spark 1.4 (я не могу обновить Spark до последней версии прямо сейчас)

Пока выполнение такой работы занимает около 30-50 минут.Однако в будущем я ожидаю, что размер входных данных увеличится вдвое (~ 300 ГБ данных => ~ 600 000 групп для объединения по идентификатору сеанса)

Как я могу оценить, что если мои данные удвоятсямой кластер сможет справиться с такой нагрузкой?Кроме того, иногда я получаю сообщение о том, что в моем регионе превышен предел квоты для типа экземпляра r3.8xlarge, поэтому я беспокоюсь, что, если я добавлю больше оборудования, эта проблема возникнет чаще.

ОБНОВЛЕНИЕ: Метод processSingeGroupOfEvents () выполняет итерацию по группе событий (Iterable) с одним и тем же идентификатором сеанса и выполняет некоторые сложные вычисления (например, подсчет промежуточного итога, поиск максимального числа элементов в группе, разметка времени и т. Д.).Он возвращает разделенную запятыми строку с агрегированными значениями для определенного идентификатора сеанса.

1 Ответ

0 голосов
/ 04 марта 2019

processSingeGroupOfEvents уменьшает объем данных?Если да, то замена groupBy и mapValues на aggregateByKey может значительно уменьшить объем данных, которые необходимо перетасовать.

После этого я рекомендую следовать общему руководству по настройке Spark https://spark.apache.org/docs/latest/tuning.html.Проверьте Spark Web UI на время сбора мусора.EMR поставляется с Ganglia, который можно использовать для мониторинга отдельных узлов в кластере.Равномерно ли используются процессор и память между узлами?

Наконец, вы можете выполнить задание с текущим объемом данных, но вдвое уменьшить количество узлов в кластере.Если работа завершается, но занимает вдвое больше времени, это хороший признак того, что нагрузка распределяется равномерно и может увеличиваться.Если он дает сбой или существенно не замедляется, в работе возникают серьезные узкие места.

...