Есть ли способ изменить количество выходных строк в папке раздела? - PullRequest
0 голосов
/ 04 декабря 2018

У меня большой набор данных о конечном результате, и эти данные неравномерно распределены по моей заинтересованной колонке.Когда я пишу это напрямую путем разбиения, каждый раздел имеет такое же количество файлов, что и spark.sql.shuffle.partitions.Это приводит к тому, что каждый файл в переполненном разделе становится очень большим (в ГБ), но в некоторых других разделах размер файла действительно мал (даже в КБ).Есть ли способ изменить количество файлов на раздел?

Пример:

+----------------------------+----------+
| number of rows in category | category |
+----------------------------+----------+
| 50000000000                |    A     |
| 200000                     |    B     |
| 30000                      |    C     |
+----------------------------+----------+

Если я сделаю:

df.write.partitionBy("category").parquet(output_dir)

Размеры файлов в папке "A"большой, в то время как те, что в" B "и" C ", маленькие.

Ответы [ 2 ]

0 голосов
/ 05 декабря 2018

Я бы предложил вызвать df.repartition(NUM_PARTITIONS) на фрейме данных, чтобы равномерно распределить строки по разделам.В вашем случае для категории = A строки будут распределены по большему количеству разделов, скажем, по сравнению с категорией C. После перераспределения, когда вы вызываете write.partitionBy("category"), для категории A, поскольку она была распределена по большему количеству разделов, большее количествофайлы будут записаны (один файл на раздел категории A).

NUM_PARTITIONS может быть динамическим, как NUM_PARTITIONS = df.count()%ROWS_PER_PARTITION.Вы можете решить, сколько ROWS_PER_PARTITION в зависимости от размера байта в строке.

NUM_PARTITIONS = 100 
df.repartition(NUM_PARTITIONS).write.partitionBy("category").parquet(output_dir)

Если вы хотите проверить, как распределены разделы, вы можете использовать это

import pyspark.sql.functions as f
df.withColumn("partition_id",f.spark_partition_id()).groupBy("partition_id").count().show()

. Для более подробных обсуждений вы увидите это Spark SQL - Разница между df.repartition и DataFrameWriter partitionBy?

0 голосов
/ 04 декабря 2018

Попробуйте переразметить фрейм данных, используя несколько столбцов (если это возможно и логично для ваших данных).

Пример:

df.repartition("category", "<some_other_column_name>").write.partitionBy("category").parquet(output_dir)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...