1. Решение с одним кадром данных
Я пытался найти какую-то умную идею, которая не убивала бы кластер в одно и то же время, и единственное, что мне пришло в голову, было:
- Рассчитать размер сериализованной строки
- Получи нет. строк в вашем DataFrame
- Передел, разделив с ожидаемым размером
- Должен работать?
Код должен выглядеть примерно так:
val df: DataFrame = ??? // your df
val rowSize = getBytes(df.head)
val rowCount = df.count()
val partitionSize = 1000000 // million bytes in MB?
val noPartitions: Int = (rowSize * rowCount / partitionSize).toInt
df.repartition(noPartitions).write.format(...) // save to csv
// just helper function from https://stackoverflow.com/a/39371571/1549135
def getBytes(value: Any): Long = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(value)
oos.close
stream.toByteArray.length
}
Хотя мой первый выбор заключался в расчете размера байта каждой строки, это было бы ужасно неэффективно. Итак, если ваш размер данных в каждой строке сильно отличается по размеру, я бы сказал, что это решение будет работать. Вы также можете рассчитать каждый n-й размер строки. Вы поняли.
Кроме того, я просто «надеюсь», что Long
будет достаточно большим, чтобы поддерживать ожидаемый размер для вычисления noPartitions
. Если нет (если у вас много строк), возможно, было бы лучше изменить порядок операций, например ::1010
val noPartitions: Int = (rowSize / partitionSize * rowCount).toInt
Опять же, это всего лишь черновая идея без знания предметной области ваших данных.
2. Межсистемное решение
Проходя по документам apache-spark , я нашел интересное межсистемное решение:
spark.sql.files.maxPartitionBytes
который устанавливает:
Максимальное количество байтов для упаковки в один раздел при чтении файлов.
Значением по умолчанию является 134217728 (128 MB)
.
Итак, я полагаю, вы можете установить его на 1000000 (1MB)
, и он будет иметь постоянный эффект на DataFrames
. Однако слишком маленький размер раздела может сильно повлиять на вашу производительность!
Вы можете настроить его при SparkSession
создании:
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.sql.files.maxPartitionBytes", 100000)
.getOrCreate()
Все вышеперечисленное действительно только в том случае, если (я правильно помню и) CSV-файл разделен с тем же количеством файлов, что и разделы DataFrame.