Ограничение максимального размера раздела данных - PullRequest
0 голосов
/ 27 августа 2018

Когда я записываю фрейм данных, скажем, в csv, файл .csv создается для каждого раздела. Предположим, я хочу ограничить максимальный размер каждого файла, скажем, до 1 МБ. Я мог бы сделать запись несколько раз и увеличить аргумент для перераспределения каждый раз. Есть ли способ, которым я могу заранее рассчитать, какой аргумент использовать для перераспределения, чтобы максимальный размер каждого файла был меньше указанного размера.

Я предполагаю, что могут быть патологические случаи, когда все данные оказываются в одном разделе. Поэтому сделайте более слабое предположение, что мы хотим только, чтобы средний размер файла был меньше определенного объема, скажем, 1 МБ.

Ответы [ 2 ]

0 голосов
/ 30 августа 2018
    val df = spark.range(10000000)
    df.cache     
    val catalyst_plan = df.queryExecution.logical
    val df_size_in_bytes = spark.sessionState.executePlan(catalyst_plan).optimizedPlan.stats.sizeInBytes

df_size_in_bytes: BigInt = 80000000

Лучшим решением было бы взять 100 записей, оценить их размер и применить ко всем строкам, как в примере выше

0 голосов
/ 30 августа 2018

1. Решение с одним кадром данных

Я пытался найти какую-то умную идею, которая не убивала бы кластер в одно и то же время, и единственное, что мне пришло в голову, было:

  1. Рассчитать размер сериализованной строки
  2. Получи нет. строк в вашем DataFrame
  3. Передел, разделив с ожидаемым размером
  4. Должен работать?

Код должен выглядеть примерно так:

val df: DataFrame = ??? // your df
val rowSize = getBytes(df.head)
val rowCount = df.count()
val partitionSize = 1000000 // million bytes in MB?
val noPartitions: Int = (rowSize * rowCount / partitionSize).toInt
df.repartition(noPartitions).write.format(...) // save to csv

// just helper function from https://stackoverflow.com/a/39371571/1549135
def getBytes(value: Any): Long = {
  val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
  val oos = new ObjectOutputStream(stream)
  oos.writeObject(value)
  oos.close
  stream.toByteArray.length
}

Хотя мой первый выбор заключался в расчете размера байта каждой строки, это было бы ужасно неэффективно. Итак, если ваш размер данных в каждой строке сильно отличается по размеру, я бы сказал, что это решение будет работать. Вы также можете рассчитать каждый n-й размер строки. Вы поняли.

Кроме того, я просто «надеюсь», что Long будет достаточно большим, чтобы поддерживать ожидаемый размер для вычисления noPartitions. Если нет (если у вас много строк), возможно, было бы лучше изменить порядок операций, например ::1010

val noPartitions: Int = (rowSize / partitionSize * rowCount).toInt

Опять же, это всего лишь черновая идея без знания предметной области ваших данных.

2. Межсистемное решение

Проходя по документам apache-spark , я нашел интересное межсистемное решение:

spark.sql.files.maxPartitionBytes который устанавливает:

Максимальное количество байтов для упаковки в один раздел при чтении файлов.

Значением по умолчанию является 134217728 (128 MB).

Итак, я полагаю, вы можете установить его на 1000000 (1MB), и он будет иметь постоянный эффект на DataFrames. Однако слишком маленький размер раздела может сильно повлиять на вашу производительность!

Вы можете настроить его при SparkSession создании:

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.sql.files.maxPartitionBytes", 100000)
  .getOrCreate()

Все вышеперечисленное действительно только в том случае, если (я правильно помню и) CSV-файл разделен с тем же количеством файлов, что и разделы DataFrame.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...