Spark Partitionby не масштабируется, как ожидалось - PullRequest
5 голосов
/ 11 февраля 2020

INPUT: Набор входных данных содержит 10 миллионов транзакций в нескольких файлах, хранящихся в виде паркета. Размер всего набора данных, включая все файлы, варьируется от 6 до 8 ГБ.

ПОСТАНОВКА ЗАДАЧИ: Разделите транзакции на основе идентификаторов клиентов, которые будут создавать одну папку для идентификатора клиента, и каждая папка будет содержать все транзакции, выполненные этим конкретным клиентом.

HDFS имеет жесткое ограничение в 6,4 миллиона на количество подкаталогов в пределах root каталог, который можно создать, используя две последние цифры идентификатора клиента в диапазоне от 00,01,02 ... до 99 для создания каталогов верхнего уровня, и каждый каталог верхнего уровня будет содержать все идентификаторы клиентов, заканчивающиеся указанными c две цифры.

Пример структуры выходного каталога:

00 / cust_id = 100900 / part1.csv 00 / cust_id = 100800 / part33.csv
01 / cust_id = 100801 / part1.csv 03 / cust_id = 100803 / part1.csv

КОД:

// Reading input file and storing in cache
val parquetReader = sparksession.read
  .parquet("/inputs")
  .persist(StorageLevel.MEMORY_ONLY) //No spill will occur has enough memory

// Logic to partition
var customerIdEndingPattern = 0
while (cardAccountEndingPattern < 100) {
  var idEndPattern = customerIdEndingPattern + ""
  if (customerIdEndingPattern < 10) {
    idEndPattern = "0" + customerIdEndingPattern
  }

  parquetReader
    .filter(col("customer_id").endsWith(idEndPattern))
    .repartition(945, col("customer_id"))
    .write
    .partitionBy("customer_id")
    .option("header", "true")
    .mode("append")
    .csv("/" + idEndPattern)
  customerIdEndingPattern = customerIdEndingPattern + 1
}

Конфигурация Spark: Amazon EMR 5.29.0 (Spark 2.4.4 и Had oop 2.8.5 )

1 мастер и 10 подчиненных, каждый из которых имеет 96 vCores и 768 ГБ ОЗУ (экземпляр Amazon AWS R5.24xlarge). Жесткие диски EBS с перебором 3000 IOPS на 30 минут до 40 ведомых устройств (соответственно настраивая конфигеры искры), но все равно те же результаты, выполнение задания занимает более 2 часов (как показано в первом pi c, каждое задание занимает более минуты, а l oop выполняется 99 раз ). Также чтения от удаленных исполнителей почти не существуют (что хорошо), большинство из них являются локальными процессами.

Кажется, что раздел работает нормально (см. Второй пи c), получая 5 блоков RDD в экземпляр и 5 задач, выполняющихся постоянно (каждый экземпляр имеет 5 ядер и 19 экземпляров на подчиненный узел). G C также оптимизировано.

Каждое задание с разделением, как записано в то время, как l oop занимает минуту или более.

МЕТРИКА:

Пример продолжительности нескольких работ. Всего у нас 99 заданий Duration for each of the jobs(totally 99)

С разделом все в порядке Partition seems okay

Сводка по 1 заданию, в основном один раздел, при выполнении Summary of 1 job Сводка по нескольким экземплярам после полного завершения задания, поэтому блоки СДР равны нулю, а первая строка - драйвер. enter image description here

Итак, вопрос в том, как оптимизировать его больше и почему он не расширяется? Есть ли лучший способ go об этом? Я уже достиг максимальной производительности? Если у меня есть доступ к большему количеству ресурсов с точки зрения аппаратного обеспечения, могу ли я что-то сделать лучше? Любые предложения приветствуются.

1 Ответ

5 голосов
/ 15 февраля 2020

Прикосновение к каждой записи 100 раз очень неэффективно, даже если данные могут быть кэшированы в памяти и не могут быть выгружены вниз по течению. Не говоря уже о сохранении в одиночку, это дорого

Вместо этого вы можете добавить виртуальный столбец

import org.apache.spark.sql.functions.substring

val df = sparksession.read
  .parquet("/inputs")
  .withColumn("partition_id", substring($"customer_id", -2, 2))

и использовать его позже для разбиения

df
  .write
  .partitionBy("partition_id", "customer_id")
  .option("header", "true")
  .mode("append")
  .csv("/")

На избегать множество небольших файлов вы можете сначала перераспределить, используя более длинный суффикс

val nParts: Int = ???
val suffixLength: Int = ???  // >= suffix length used for write partitions

df
  .repartitionByRange(
    nParts,
    substring($"customer_id", -suffixLength, suffixLength)
  .write
  .partitionBy("partition_id", "customer_id")
  .option("header", "true")
  .mode("append")
  .csv("/")

Такие изменения позволят вам обрабатывать все данные за один проход без какого-либо явного кэширования.

...