Механизм столкновения Spark HashPartitioner? - PullRequest
0 голосов
/ 31 октября 2019

Кто-нибудь знает, есть ли в Spark HashPartitioner автоматический механизм столкновения для назначения ключа новому разделу? Т.е. если у меня есть очень искаженные данные, в которых один ключ содержит много записей, и к

partition = hash (key)% num_partitions

я получу много записей втот же раздел, который не будет в памяти. В этом случае у HashPartitioner есть что-то вроде проверки назначения записей новому разделу или нет? Если это не так, нужно ли мне создавать настраиваемый разделитель для работы с перекосом ключа? Большое спасибо.

1 Ответ

0 голосов
/ 31 октября 2019

Я не думаю, что HashPartitioner собирается помещать записи с одинаковым ключом в два разных раздела в любой ситуации. javadoc для разделителя четко говорит следующее:

Объект, который определяет, как элементы в паре ключ-значение RDD разделяются по ключу. Сопоставляет каждый ключ с идентификатором раздела, от 0 до numPartitions - 1.

Обратите внимание, что разделитель должен быть детерминированным, то есть он должен возвращать один и тот же идентификатор раздела при одинаковом ключе раздела.

Если размещение записей с одинаковыми ключами в одном и том же разделе не является обязательным для вас, возможно, вы можете попробовать следующее без реализации пользовательского разделителя.

  1. Допустим, вы хотите записать кадр данных в1000 файлов.
  2. Добавьте новый столбец к вашему фрейму данных со случайными целыми числами от 0 до 999.
    _num_output_files = 1000
    df = df.withColumn('rand', round(rand() * (_num_output_files-1), 0).astype(IntegerType()))
    
  3. WLG, давайте предположим, что столбец rand - ваш i-й столбец вкадр данных. Нам нужно использовать этот столбец в качестве ключа для rdd, а затем разделить по этому ключу. Это обеспечит практически равномерное распределение данных по всем разделам. Следующий фрагмент кода достигнет этого.

    tmp_rdd = df.rdd.keyBy(lambda x: x[i-1])
    tmp_rdd = tmp_rdd.partitionBy(_num_output_files, lambda x: x)
    df_rdd = spark.createDataFrame(tmp_rdd.map(lambda x: x[1]))
    

Примечание : Это удобный фрагмент кода для проверки текущего распределения записей по разделам в Pyspark: print('partition distrib: ' + str(df_rdd.rdd.glom().map(len).collect())). После вызова предыдущего набора методов вы должны увидеть примерно одинаковые номера в каждом разделе.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...