Спарк объединяется по количеству объектов в каждом разделе - PullRequest
0 голосов
/ 18 декабря 2018

Мы начинаем экспериментировать с искрой в нашей команде.После того, как мы сократили объем работы в Spark, мы хотели бы записать результат в S3, однако мы бы хотели избежать сбора результата искры.На данный момент мы пишем файлы в Spark forEachPartition RDD, однако это привело к множеству маленьких файлов.Мы хотели бы иметь возможность объединять данные в пару файлов, разделенных по количеству объектов, записанных в файл.Так, например, наши общие данные составляют 1 МБ объектов (это константа), мы хотели бы создать файл объектов 400 КБ, а наш текущий раздел создает файл объектов размером около 20 КБ (это сильно варьируется для каждой работы).В идеале мы хотим создать 3 файла, каждый из которых содержит 400 КБ, 400 КБ и 200 КБ вместо 50 файлов с объектами 20 КБ

У кого-нибудь есть хорошее предложение?

Мой мыслительный процесс заключается в том, чтобы каждый раздел обрабатывалсяв какой индекс он должен записывать это, предполагая, что каждый раздел будет грубо производить одинаковое количество объектов.Так, например, раздел 0 запишет в первый файл, а раздел 21 запишет во второй файл, поскольку он будет предполагать, что начальный индекс для объекта - 20000 * 21 = 42000, что больше, чем размер файла.Раздел 41 запишет в третий файл, так как он больше 2 * ограничения размера файла.Это не всегда приводит к идеальному ограничению размера файла в 400 КБ, хотя это скорее приближение.

Я понимаю, что есть объединение, но, насколько я понимаю, объединение - это уменьшение количества разделов на основе количествараздел хотел.Что я хочу, чтобы объединить данные на основе количества объектов в каждом разделе, есть ли хороший способ сделать это?

Ответы [ 2 ]

0 голосов
/ 12 января 2019

Что вы хотите сделать, это переразбить файлы на три раздела;данные будут разделены примерно на 333 тыс. записей на раздел.Раздел будет приблизительным, он не будет точно 333,333 на раздел.Я не знаю, как получить желаемый раздел 400k / 400k / 200k.

Если у вас есть DataFrame `df ', вы можете перераспределить на n разделов как

df.repartition(n)

Поскольку вам нужно максимальное количество или записей на раздел, я бы порекомендовал это (вы нея не могу указать Scala или pyspark, поэтому я собираюсь использовать Scala; вы можете сделать то же самое в pyspark):

val maxRecordsPerPartition = ???
val numPartitions = (df.count() / maxRecordsPerPartition).toInt + 1
df
    .repartition(numPartitions)
    .write
    .format('json')
    .save('/path/file_name.json')

Это гарантирует, что ваши разделы будут меньше, чем maxRecordsPerPartition.

0 голосов
/ 12 января 2019

Мы решили просто указать количество генерируемых файлов и убедиться, что каждый файл содержит менее 1 миллиона позиций

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...