Перечислите все подкаталоги для заданного пути HDFS до определенной глубины, используя Scala? - PullRequest
1 голос
/ 04 февраля 2020

У меня есть различные проекты Spark, которые записывают данные в HDFS в нескольких секционированных форматах. Пример:

Формат 1:

/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x/dir4=y/second_id=4567

Формат 2:

/tmp/myProject2/dir1/dir2/parquet/first_id=3212/second_id=9129

Формат 3:

/tmp/myProject3/dir1/dir2/parquet/first_id=9912/dir3=x/second_id=1129

У меня вопрос к базовому пути, который является /tmp/<myProject>/dir1/dir2/parquet, какой самый простой способ динамически построить эти пути до second_id ?

Примечание: я не буду использовать sh для использования подстановочных знаков, но вместо этого хочу получить список всех этих путей динамически до second_id с учетом любого базового пути. Я не могу найти метод, который был бы достаточно гибким, чтобы создать список таких путей до second_id, учитывая любой базовый путь в качестве параметра.

То, что я пробовал до сих пор:

val fs = FileSystem.get(new Configuration())
val status = fs.listStatus(new Path("/tmp/myProject1/dir1/dir2/parquet/first_id=1234/"))
status.foreach(x=> println(x.getPath))

Это просто печатает до уровня 1:

/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=a
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=c

Вместо этого я хочу, чтобы он перечислял все файлы до second_id Например:

/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=x/dir4=y/second_id=4567
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=a/dir4=z/second_id=1231
/tmp/myProject1/dir1/dir2/parquet/first_id=1234/dir3=c/dir4=k/second_id=4123

И аналогично, для двух других форматов в нем должны быть перечислены все файлы вплоть до second_id. Есть ли возможное решение этого? Я довольно новичок в HDFS и Scala.

1 Ответ

2 голосов
/ 04 февраля 2020

Учитывая базовый путь, равный /tmp/<myProject>/dir1/dir2/parquet, что было бы самым простым способом динамически построить эти пути до second_id?

В Had * 1017 такой опции нет * FS API. Для файлов Вы можете использовать метод listFiles для рекурсивного вывода списка файлов, но у вас нет контроля над максимальной глубиной.

Для каталогов это можно сделать с помощью пользовательской рекурсивной функции, например:

import org.apache.hadoop.fs._

def listDirectories(baseFolder: Path, depth: Integer = 0, maxDepth: Integer = -1): Seq[Path] = {
  val fs = FileSystem.get(sc.hadoopConfiguration)
  val ls = fs.listStatus(baseFolder)
  ls.filter(_.isDir).flatMap { s =>
    maxDepth match {
      case m if (m == -1 || depth < m) => listDirectories(s.getPath, depth + 1, maxDepth)
      case _ => Seq(s.getPath)

      }
    }
}

Используя ее для вашего примера:

val baseFolder = new Path("/tmp/myProject1/dir1/dir2/parquet/first_id=1234/")

// listing all subDirectories up to second_id
val subDirectories = listDirectories(baseFolder, maxDepth=-1)

// listing all subDirectories up to dir4
val subDirectories = listDirectories(baseFolder, maxDepth=1)
...