INPUT: Набор входных данных содержит 10 миллионов транзакций в нескольких файлах, хранящихся в виде паркета. Размер всего набора данных, включая все файлы, варьируется от 6 до 8 ГБ.
ПОСТАНОВКА ЗАДАЧИ: Разделите транзакции на основе идентификаторов клиентов, которые будут создавать одну папку для идентификатора клиента, и каждая папка будет содержать все транзакции, выполненные этим конкретным клиентом.
HDFS имеет жесткое ограничение в 6,4 миллиона на количество подкаталогов в пределах root каталог, который можно создать, используя две последние цифры идентификатора клиента в диапазоне от 00,01,02 ... до 99 для создания каталогов верхнего уровня, и каждый каталог верхнего уровня будет содержать все идентификаторы клиентов, заканчивающиеся указанными c две цифры.
Пример структуры выходного каталога:
00 / cust_id = 100900 / part1.csv 00 / cust_id = 100800 / part33.csv
01 / cust_id = 100801 / part1.csv 03 / cust_id = 100803 / part1.csv
КОД:
// Reading input file and storing in cache
val parquetReader = sparksession.read
.parquet("/inputs")
.persist(StorageLevel.MEMORY_ONLY) //No spill will occur has enough memory
// Logic to partition
var customerIdEndingPattern = 0
while (cardAccountEndingPattern < 100) {
var idEndPattern = customerIdEndingPattern + ""
if (customerIdEndingPattern < 10) {
idEndPattern = "0" + customerIdEndingPattern
}
parquetReader
.filter(col("customer_id").endsWith(idEndPattern))
.repartition(945, col("customer_id"))
.write
.partitionBy("customer_id")
.option("header", "true")
.mode("append")
.csv("/" + idEndPattern)
customerIdEndingPattern = customerIdEndingPattern + 1
}
Конфигурация Spark: Amazon EMR 5.29.0 (Spark 2.4.4 и Had oop 2.8.5 )
1 мастер и 10 подчиненных, каждый из которых имеет 96 vCores и 768 ГБ ОЗУ (экземпляр Amazon AWS R5.24xlarge). Жесткие диски EBS с перебором 3000 IOPS на 30 минут до 40 ведомых устройств (соответственно настраивая конфигеры искры), но все равно те же результаты, выполнение задания занимает более 2 часов (как показано в первом pi c, каждое задание занимает более минуты, а l oop выполняется 99 раз ). Также чтения от удаленных исполнителей почти не существуют (что хорошо), большинство из них являются локальными процессами.
Кажется, что раздел работает нормально (см. Второй пи c), получая 5 блоков RDD в экземпляр и 5 задач, выполняющихся постоянно (каждый экземпляр имеет 5 ядер и 19 экземпляров на подчиненный узел). G C также оптимизировано.
Каждое задание с разделением, как записано в то время, как l oop занимает минуту или более.
МЕТРИКА:
Пример продолжительности нескольких работ. Всего у нас 99 заданий
С разделом все в порядке
Сводка по 1 заданию, в основном один раздел, при выполнении Сводка по нескольким экземплярам после полного завершения задания, поэтому блоки СДР равны нулю, а первая строка - драйвер.
Итак, вопрос в том, как оптимизировать его больше и почему он не расширяется? Есть ли лучший способ go об этом? Я уже достиг максимальной производительности? Если у меня есть доступ к большему количеству ресурсов с точки зрения аппаратного обеспечения, могу ли я что-то сделать лучше? Любые предложения приветствуются.