Рассмотрим следующие тестовые каталоги foo
и bar
, которые содержат следующие файлы:
cat /tmp/foo/0.csv
4
cat /tmp/foo/1.csv
3
cat /tmp/bar/0.csv
7
Мы можем прочитать их, используя следующий фрагмент:
val df = spark.read.csv("/tmp/foo", "/tmp/bar")
.withColumn("dir", regexp_extract(input_file_name(), """([^/]*)/[^/]+\.csv$""", 1))
df.show()
/*
+---+---+
|_c0|dir|
+---+---+
|4 |foo|
|3 |foo|
|7 |bar|
+---+---+
*/
Функция input_file_name
дает абсолютный путь к файлу, поэтому мы можем использовать его для получения каталога.Функция regexp_extract
используется только для преобразования, например, /tmp/foo/1.csv -> foo
.
Когда Spark записывает файлы, он выводит один файл на раздел.Таким образом, нам нужно перераспределить по столбцу dir
, чтобы объединить все файлы под каждым каталогом.Наконец, мы можем использовать partitionBy
, чтобы получить имя каталога и структуру выходного файла.Например,
df.repartition($"dir")
.write
.partitionBy("dir")
.csv("/tmp/out")
создаст файлы
/tmp/out/._SUCCESS.crc
/tmp/out/dir=bar/.part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc
/tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/_SUCCESS
/tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/dir=foo/.part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc
, где /tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
содержит
7
, а /tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
содержит
4
3
AFAIK, невозможно записать эти выходные файлы в ту же структуру каталогов, что и исходный ввод, например, без наличия настроенного класса Hadoop FileSystem
и т. Д.