Scala; рекурсивно пройти все каталоги в родительском каталоге Had oop - PullRequest
0 голосов
/ 09 марта 2020

Я пытался создать рекурсивную функцию для обхода всех каталогов по родительскому пути Had oop. У меня есть следующая функция ниже, но вывод представляет собой набор вложенных массивов объектов, поэтому не совсем то, что я ищу, но он идет по пути Had oop. Любые предложения с благодарностью. Моя цель состоит в том, чтобы тип возвращаемого значения был Array [Path].

Получить пути к разделам нижнего уровня для примера родительского каталога: parent /hadoop/parent/path с разделами month, day так что в этом случае мы ожидаем массив с 365 путями.

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
val parentPath = "/hadoop/parent/path"
val hdfsPath: Path = new Path(parentPath)

def recursiveWalk(hdfsPath: Path): Array[Object] = {
    val fs: FileSystem = hdfsPath.getFileSystem(spark.sessionState.newHadoopConf())
    val fileIterable = fs.listStatus(hdfsPath)
    val res = for (f <- fileIterable) yield {
        if (f.isDirectory) {
            recursiveWalk(f.getPath).distinct
        }
        else {
            hdfsPath
        }
    }
    res.distinct
}

Ответы [ 2 ]

1 голос
/ 09 марта 2020

попробуйте использовать это:

def recursiveWalk(hdfsPath: Path): Array[Path] = {
    val fs: FileSystem = hdfsPath.getFileSystem(spark.sessionState.newHadoopConf())
    if (fs.isDirectory(hdfsPath)) {
      fs.listStatus(hdfsPath).flatMap(innerPath => recursiveWalk(innerPath.getPath))
    } else Array.empty[Path]
  }

или, если вам нужны файлы в директориях, также используйте:

def getDirsWithFiles(hdfsPath: Path): Array[Path] = {
    val fs: FileSystem = hdfsPath.getFileSystem(spark.sessionState.newHadoopConf())
    if (fs.isDirectory(hdfsPath)) {
      fs.listStatus(hdfsPath).flatMap(innerPath => getDirsWithFiles(innerPath.getPath))
    } else Array(hdfsPath)
  }
1 голос
/ 09 марта 2020

Вы определили рекурсивную функцию, которая генерирует массив (для l oop):

  • вывод функции, если элемент является каталогом, который является массивом объектов.
  • a Path, если это простой файл.

Это объясняет тот факт, что вы получаете вложенные массивы (массив массивов).

Вы можете использовать flatMap чтобы избежать этой проблемы. Он преобразует (или «выравнивает») список объектов в список объектов. Кроме того, для получения ожидаемого типа вам необходимо иметь совпадающие типы между вашим условием остановки и рекурсией (Array из Path). Поэтому вам нужно обернуть hdfsPath внутри массива.

Вот как можно быстро решить вашу проблему, основываясь на том, что я только что написал:

def recursiveWalk(hdfsPath: Path): Array[Path] = {
    val fs: FileSystem = hdfsPath.getFileSystem(spark.sessionState.newHadoopConf())
    val fileIterable = fs.listStatus(hdfsPath)
    val res = fileIterable.flatMap(f => {
        if (f.isDirectory) {
            recursiveWalk(f.getPath).distinct
        }
        else {
            Array(hdfsPath)
        }
    })
    res.distinct
}

Приведенный выше код устраняет проблему, но Избегая использования различных, вы можете поместить условие во входной файл вместо его подпапок, как показано ниже. Вы также можете определить файловую систему раз и навсегда вне функции.

val conf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem(conf)

def recursiveWalk(path : Path): Array[Path] = {
    if(hdfs.isDirectory(path))
        hdfs.listStatus(path).map(_.getPath).flatMap(rec _) :+ path
    else Array()
}
...