Scala - Spark Repartition не дает ожидаемых результатов - PullRequest
0 голосов
/ 12 июля 2020

Я хочу переразбить мой фрейм данных Spark на основе столбца X. Скажем, столбец X имеет 3 различных значения (X1, X2, X3). Количество различных значений может быть разным.

Я хочу, чтобы один раздел содержал записи только с одним значением X. ie. Мне нужны 3 раздела, в 1 из которых есть записи, где X = X1, другие с X = X2 и последние с X = X3.

У меня есть уникальные значения X из фрейма данных по запросу

val uniqueList = DF.select("X").distinct().map(x => x(0).toString).collect() 

который дает список уникальных значений правильно.

И для повторного разбиения я делаю

DF = DF.repartition(uniqueList.length, col('X'))

Однако мои разделы в DF не появляются, как ожидалось. Данные распределяются неправильно, так как один раздел пуст, второй содержит записи с X1, а третий раздел содержит записи с X2 и X3.

Может ли кто-нибудь помочь, если мне что-то не хватает.

EDIT :

Мой столбец X может иметь различное количество уникальных значений. Он может иметь 3 или 3000 уникальных значений. Если я сделаю ниже

DF = DF.repartition(col('X'))

, я получу только 200 разделов, так как это значение по умолчанию для spark. sql .shuffle.partitions. Таким образом, я даю номер раздела

Если есть 3000 уникальных значений X, тогда я хочу переразбить мой DF таким образом, чтобы было 3000 разделов, и каждый раздел содержал записи для одного конкретного значения X. Итак что я могу запустить mapPartition и обрабатывать каждый раздел параллельно.

Ответы [ 3 ]

2 голосов
/ 13 июля 2020

Перераспределение основано на разбиении ha sh (возьмите код ha sh ключа разделения по модулю количества разделов), поэтому наличие у каждого раздела только одного значения является чисто случайным.

Если вы можете сопоставить каждый ключ разделения с уникальным Int в диапазоне от нуля до (количество уникальных значений - 1), поскольку код ha sh для Int в Scala является этим целым числом, это обеспечит что если существует как минимум столько разделов, сколько уникальных значений, ни один раздел не имеет нескольких отдельных значений ключа разделения.

Тем не менее, придумывание значений для таких Int s по своей сути не распараллеливается и требует либо последовательного сканирования, либо предварительного знания отдельных значений.

Вероятно, вероятность того, что конкретное значение хешируется в конкретный раздел ( n разделов), составляет 1 / п . Поскольку n увеличивается относительно количества различных значений, вероятность того, что ни один раздел не имеет более одного отличного значения, увеличивается (на пределе, если у вас может быть 2 ^ 32 раздела, почти все из них будут пустыми но фактическая коллизия ha sh по-прежнему гарантирует наличие нескольких различных значений в разделе). Поэтому, если вы можете допускать пустые разделы, выбор количества разделов, которое значительно превышает количество различных значений, снизит вероятность получения неидеального результата.

0 голосов
/ 12 июля 2020

Может ли ваш столбец X содержать нулевые значения? Затем Spark пытается создать для этого один раздел. Поскольку вы также указываете количество разделов как int, возможно, Spark пытается сжать sh X2 и X3. Таким образом, вы можете попробовать две вещи - просто указать имя столбца для исправления (еще один дополнительный раздел) или попытаться удалить нулевые значения из X, если они существуют.

0 голосов
/ 12 июля 2020

Это работает?

val repartitionedDF = DF.repartition(col("X"))

Вот пример из сообщения блога от .

Данные:

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China

Код:

df
  .repartition(col("country"))
  .write
  .partitionBy("country")
  .parquet(outputPath)

Вывод файловой системы:

partitioned_lake1/
  country=Argentina/
    part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
  country=China/
    part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
  country=Russia/
    part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
...