Несмотря на то, что документация кажется немного трудной для понимания и делает некоторые предположения по этому вопросу - то есть она хотела бы, чтобы 4 или, скорее, N файлов (?) Были выведены с восходящим подходом к идентификатору, указанному в столбце «i», настоящиммой собственный адаптированный пример Spark 2.4 , который берет 20 записей и разбивает их на 4 равномерно распределенных раздела, а затем записывает их.Давайте пойдем:
val list = sc.makeRDD((1 to 20)).map((_, 1,"2019-01-01", "2019-01-01",1,2,"XXXXXXXXXXXXXXXXXXXXXXXXXX"))
val df = list.toDF("customer_id", "dummy", "report_date", "date", "value_1", "value_2", "dummy_string")
df.show(false)
Отображение только нескольких записей:
+-----------+-----+-----------+----------+-------+-------+--------------------------+
|customer_id|dummy|report_date|date |value_1|value_2|dummy_string |
+-----------+-----+-----------+----------+-------+-------+--------------------------+
|1 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|2 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|3 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|4 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|5 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|6 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
|7 |1 |2019-01-01 |2019-01-01|1 |2 |XXXXXXXXXXXXXXXXXXXXXXXXXX|
...
Затем - включая некоторую дополнительную сортировку для хорошей меры - это не обязательно, работа со всеми форматами:
df.repartitionByRange(4, $"customer_id")
.sortWithinPartitions("customer_id", "date", "value_1")
.write
.parquet("/tmp/SOQ6")
Это дало 4 файла, как на картинке ниже:
Вы можете увидеть 4 файла, и первые и последние наименования частей очевидны,Выполнение:
val lines = spark.read.parquet("/tmp/SOQ6/part-00000-tid-2518447510905190948-a81455f6-6c0b-4e02-89b0-57dfddf1fb97-1200-c000.snappy.parquet")
val words = lines.collect
lines.count
показывает 5 записей и содержимое, упорядоченное последовательно в соответствии с фреймом данных.
lines: org.apache.spark.sql.DataFrame = [customer_id: int, dummy: int ... 5 more fields]
words: Array[org.apache.spark.sql.Row] = Array([1,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [2,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [3,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [4,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX], [5,1,2019-01-01,2019-01-01,1,2,XXXXXXXXXXXXXXXXXXXXXXXXXX])
res11: Long = 5
Выполнено это для всех файлов, но отображается только одна.
Заключительные комментарии
Является ли это хорошей идеей, это отдельная история, например, подумайте о не транслируемых соединениях, которые являются проблемой.
Кроме того, я бы, очевидно, не жестко закодировал 4, а применил бы некоторую формулу для N, которую нужно применить к partitionByRange!Например:
val N = some calculation based on counts in DF and your cluster
val df2 = df.repartition(N, $"c1", $"c2")
Вы должны протестировать DF Writer, поскольку документация не совсем понятна.
Проверено на кластере EMR с 2M записями, 4 файлами, а также с точки зрения вывода.