Разделить Spark DataFrame на основе значений в существующем столбце на выбранное количество разделов - PullRequest
0 голосов
/ 11 февраля 2019

Я хотел бы разбить Spark DataFrame на четное количество разделов на основе столбца индекса перед записью в файл.Я хотел бы контролировать количество создаваемых разделов в зависимости от размера DataFrame, а затем использовать его при записи в файл Parquet с помощью partitionBy.

Имея пример DataFrame:

 i     b
 0    11
 1     9
 2    13
 3     2
 4    15
 5     3
 6    14
 7    16
 8    11
 9     9
 10   17
 11   10

Если предположить, что я хотел бы создать 4 раздела на основе значений в столбце i, тогда разделы будут соответствовать значениям, присвоенным столбцу g:

g    i     b
0    0    11
0    1     9
0    2    13
1    3     2
1    4    15
1    5     3
2    6    14
2    7    16
2    8    11
3    9     9
3   10    17
3   11    10

Какой предпочтительный способ сделать этов искре?

1 Ответ

0 голосов
/ 12 февраля 2019

Несмотря на то, что документация кажется немного трудной для понимания и делает некоторые предположения по этому вопросу - то есть она хотела бы, чтобы 4 или, скорее, N файлов (?) Были выведены с восходящим подходом к идентификатору, указанному в столбце «i», настоящиммой собственный адаптированный пример Spark 2.4 , который берет 20 записей и разбивает их на 4 равномерно распределенных раздела, а затем записывает их.Давайте пойдем:

val list = sc.makeRDD((1 to 20)).map((_, 1,"2019-01-01", "2019-01-01",1,2,"XXXXXXXXXXXXXXXXXXXXXXXXXX"))

val df = list.toDF("customer_id", "dummy", "report_date", "date", "value_1", "value_2", "dummy_string")
df.show(false)

Отображение только нескольких записей:

+-----------+-----+-----------+----------+-------+-------+--------------------------+
|customer_id|dummy|report_date|date      |value_1|value_2|dummy_string              |
+-----------+-----+-----------+----------+-------+-------+--------------------------+
|1          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|2          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|3          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|4          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|5          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|6          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|7          |1    |2019-01-01 |2019-01-01|1      |2      |XXXXXXXXXXXXXXXXXXXXXXXXXX|
...

Затем - включая некоторую дополнительную сортировку для хорошей меры - это не обязательно, работа со всеми форматами:

df.repartitionByRange(4, $"customer_id")
  .sortWithinPartitions("customer_id", "date", "value_1")
  .write
  .parquet("/tmp/SOQ6")

Это дало 4 файла, как на картинке ниже:

enter image description here

Вы можете увидеть 4 файла, и первые и последние наименования частей очевидны,Выполнение:

val lines = spark.read.parquet("/tmp/SOQ6/part-00000-tid-2518447510905190948-a81455f6-6c0b-4e02-89b0-57dfddf1fb97-1200-c000.snappy.parquet")
val words = lines.collect
lines.count

показывает 5 записей и содержимое, упорядоченное последовательно в соответствии с фреймом данных.

lines: org.apache.spark.sql.DataFrame = [customer_id: int, dummy: int ... 5 more fields]
 words: Array[org.apache.spark.sql.Row] = Array([1,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [2,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [3,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [4,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [5,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX])
res11: Long = 5

Выполнено это для всех файлов, но отображается только одна.

Заключительные комментарии

Является ли это хорошей идеей, это отдельная история, например, подумайте о не транслируемых соединениях, которые являются проблемой.

Кроме того, я бы, очевидно, не жестко закодировал 4, а применил бы некоторую формулу для N, которую нужно применить к partitionByRange!Например:

val N = some calculation based on counts in DF and your cluster 
val df2 = df.repartition(N, $"c1", $"c2")

Вы должны протестировать DF Writer, поскольку документация не совсем понятна.

Проверено на кластере EMR с 2M записями, 4 файлами, а также с точки зрения вывода.

...