Вы можете использовать maxRecordsPerFile
во время записи dataframe
.
- Если вам нужно целый фрейм данных для записи 1000 записей в каждом файле, затем используйте
repartition(1)
(or)
, запишите 1000 записей для каждого раздела , используйте .coalesce(1)
Example:
# 1000 records written per file in each partition
df.coalesce(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)
# 1000 records written per file for dataframe 100 files created for 100,000
df.repartition(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)
#or by set config on spark session
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000)
#or
spark.sql("set spark.sql.files.maxRecordsPerFile=1000").show()
df.coalesce(1).write.mode("overwrite").parquet(<path>)
df.repartition(1).write.mode("overwrite").parquet(<path>)
Method-2:
Caluculating number of partitions then repartition the dataframe:
df = spark.range(10000)
#caluculate partitions
no_partitions=df.count()/1000
from pyspark.sql.functions import *
#repartition and check number of records on each partition
df.repartition(no_partitions).\
withColumn("partition_id",spark_partition_id()).\
groupBy(col("partition_id")).\
agg(count("*")).\
show()
#+-----------+--------+
#|partiton_id|count(1)|
#+-----------+--------+
#| 1| 1001|
#| 6| 1000|
#| 3| 999|
#| 5| 1000|
#| 9| 1000|
#| 4| 999|
#| 8| 1000|
#| 7| 1000|
#| 2| 1001|
#| 0| 1000|
#+-----------+--------+
df.repartition(no_partitions).write.mode("overwrite").parquet(<path>)