Я не думаю, что HashPartitioner собирается помещать записи с одинаковым ключом в два разных раздела в любой ситуации. javadoc для разделителя четко говорит следующее:
Объект, который определяет, как элементы в паре ключ-значение RDD разделяются по ключу. Сопоставляет каждый ключ с идентификатором раздела, от 0 до numPartitions - 1.
Обратите внимание, что разделитель должен быть детерминированным, то есть он должен возвращать один и тот же идентификатор раздела при одинаковом ключе раздела.
Если размещение записей с одинаковыми ключами в одном и том же разделе не является обязательным для вас, возможно, вы можете попробовать следующее без реализации пользовательского разделителя.
- Допустим, вы хотите записать кадр данных в1000 файлов.
- Добавьте новый столбец к вашему фрейму данных со случайными целыми числами от 0 до 999.
_num_output_files = 1000
df = df.withColumn('rand', round(rand() * (_num_output_files-1), 0).astype(IntegerType()))
WLG, давайте предположим, что столбец rand - ваш i-й столбец вкадр данных. Нам нужно использовать этот столбец в качестве ключа для rdd, а затем разделить по этому ключу. Это обеспечит практически равномерное распределение данных по всем разделам. Следующий фрагмент кода достигнет этого.
tmp_rdd = df.rdd.keyBy(lambda x: x[i-1])
tmp_rdd = tmp_rdd.partitionBy(_num_output_files, lambda x: x)
df_rdd = spark.createDataFrame(tmp_rdd.map(lambda x: x[1]))
Примечание : Это удобный фрагмент кода для проверки текущего распределения записей по разделам в Pyspark: print('partition distrib: ' + str(df_rdd.rdd.glom().map(len).collect()))
. После вызова предыдущего набора методов вы должны увидеть примерно одинаковые номера в каждом разделе.