Шаги:
1) Считать файл с S3 (140-200 ГБ).
2) Затем я применяю функцию filter (), чтобы удалить большинство карт записей (будет удалено около 70%).
3) Для остальных записей (около 40-50 ГБитого), используйте map () для создания JavaRdd MyObj.
4) После этого я использую group by для группировки объектов по полю sesisonId.В общей сложности 40-50 ГБ я ожидаю получить около 300 000 групп в groupBy.
5) Затем для каждой группы я выполняю processSingeGroupOfEvents () (эта функция выглядит следующим образом: она принимает Interable, а затем выполняетнесколько простых сотрудников для каждого Iterable [из 300 000]).
6) После этого я использую coalesce () и saveAsTextFile для вывода результатов на S3 (размер выходного файла будет около 1-2 ГБ).
Псевдокод:
JavaRDD<MyObj> eventsCollection = context.textFile(input)
.filter(data -> applySomeFilter(data)) // This will filter ~70% of records
.map(data -> createMyObjFromData(data));
JavaPairRDD<String, Iterable<MyObj>> eventsCollectionGroupedById = eventsCollection
.groupBy(x -> x.getSessionId())
.persist(StorageLevel.MEMORY_AND_DISK());;
JavaPairRDD<String, String> groupedByIdResults = eventsCollectionGroupedById
.mapValues(iterable -> processSingeGroupOfEvents(iterable, additionalVariable1, additionalVariable2 ));
groupedByIdResults
.coalesce(1) // Union all partitions into single one (in order to avoid multiple output files)
.map(data -> data._2())
.saveAsTextFile(outputS3Location);
Мои текущие конфигурации для приложения AWS EMR Spark:
- 4 типа экземпляра ядра r3.8xlarge
- и 1 из r3.8xlarge для мастераузел
- метка выпуска Emr: emr-5.11.0
- maximizeResourceAllocation = true.
- версия Spark 1.4 (я не могу обновить Spark до последней версии прямо сейчас)
Пока выполнение такой работы занимает около 30-50 минут.Однако в будущем я ожидаю, что размер входных данных увеличится вдвое (~ 300 ГБ данных => ~ 600 000 групп для объединения по идентификатору сеанса)
Как я могу оценить, что если мои данные удвоятсямой кластер сможет справиться с такой нагрузкой?Кроме того, иногда я получаю сообщение о том, что в моем регионе превышен предел квоты для типа экземпляра r3.8xlarge, поэтому я беспокоюсь, что, если я добавлю больше оборудования, эта проблема возникнет чаще.
ОБНОВЛЕНИЕ: Метод processSingeGroupOfEvents () выполняет итерацию по группе событий (Iterable) с одним и тем же идентификатором сеанса и выполняет некоторые сложные вычисления (например, подсчет промежуточного итога, поиск максимального числа элементов в группе, разметка времени и т. Д.).Он возвращает разделенную запятыми строку с агрегированными значениями для определенного идентификатора сеанса.