Обрабатывать несколько каталогов в Spark отдельно - PullRequest
2 голосов
/ 09 июля 2019

У меня есть список каталогов в HDFS, каждый из которых содержит несколько файлов.Моя цель - объединить все файлы из одного каталога в один файл, но для каждого каталога отдельно.Какой самый быстрый способ сделать это в искре?Последовательная итерация по всем каталогам идет слишком медленно.Поэтому я хочу сделать это параллельно.Одним из решений может быть использование пула потоков.Может быть, есть более быстрый и быстрый родной?

Спасибо!

1 Ответ

3 голосов
/ 09 июля 2019

Рассмотрим следующие тестовые каталоги foo и bar, которые содержат следующие файлы:

cat /tmp/foo/0.csv
4
cat /tmp/foo/1.csv
3
cat /tmp/bar/0.csv
7

Мы можем прочитать их, используя следующий фрагмент:

val df = spark.read.csv("/tmp/foo", "/tmp/bar")
  .withColumn("dir", regexp_extract(input_file_name(), """([^/]*)/[^/]+\.csv$""", 1))
df.show()
/*
+---+---+
|_c0|dir|
+---+---+
|4  |foo|
|3  |foo|
|7  |bar|
+---+---+
*/

Функция input_file_name дает абсолютный путь к файлу, поэтому мы можем использовать его для получения каталога.Функция regexp_extract используется только для преобразования, например, /tmp/foo/1.csv -> foo.

Когда Spark записывает файлы, он выводит один файл на раздел.Таким образом, нам нужно перераспределить по столбцу dir, чтобы объединить все файлы под каждым каталогом.Наконец, мы можем использовать partitionBy, чтобы получить имя каталога и структуру выходного файла.Например,

df.repartition($"dir")
  .write
  .partitionBy("dir")
  .csv("/tmp/out")

создаст файлы

/tmp/out/._SUCCESS.crc
/tmp/out/dir=bar/.part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc
/tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/_SUCCESS
/tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/dir=foo/.part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc

, где /tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv содержит

7

, а /tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv содержит

4
3

AFAIK, невозможно записать эти выходные файлы в ту же структуру каталогов, что и исходный ввод, например, без наличия настроенного класса Hadoop FileSystem и т. Д.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...