Перечислите все файлы в Azure Blob с помощью блоков данных - PullRequest
0 голосов
/ 26 февраля 2020

Я после сценария pyspark (python), чтобы вывести список всех файлов в хранилище BLOB-объектов Azure (включая подкаталоги). Я нашел скрипт для этой цели в scala, нужна помощь в преобразовании этого скрипта в pyspark.

https://docs.microsoft.com/en-us/azure/databricks/kb/data/list-delete-files-faster#list -файлов Масштабный код


    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{Path, FileSystem}
    import org.apache.spark.deploy.SparkHadoopUtil
    import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
    import java.net.URI

    def listFiles(basep: String, globp: String): Seq[String] = {
      val conf = new Configuration(sc.hadoopConfiguration)
      val fs = FileSystem.get(new URI(basep), conf)

      def validated(path: String): Path = {
        if(path startsWith "/") new Path(path)
        else new Path("/" + path)
      }

      val fileCatalog = InMemoryFileIndex.bulkListLeafFiles(
        paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
        hadoopConf = conf,
        filter = null,
        sparkSession = spark)

      fileCatalog.flatMap(_._2.map(_.path))
    }

    val root = "/mnt/path/table"
    val globp = "[^_]*" // glob pattern, e.g. "service=webapp/date=2019-03-31/*log4j*"

    val files = listFiles(root, globp)
    files.toDF("path").show()

I удалось преобразовать код в pyspark, но я получаю сообщение об ошибке ниже. У объекта 'JavaMember' нет атрибута 'globPath'


    configuration = sc._jvm.org.apache.hadoop.conf
    fspath = sc._jvm.org.apache.hadoop.fs
    hadooputil = sc._jvm.org.apache.spark.deploy.SparkHadoopUtil
    inmemfileindex = sc._jvm.org.apache.spark.sql.execution.datasources.InMemoryFileIndex
    javauri = sc._jvm.java.net.URI

    rootURL = "/mnt/"
    globp = "[^_]*" #glob pattern, e.g. "service=webapp/date=2019-03-31/*log4j*"

    conf = sc._jvm.org.apache.hadoop.conf.Configuration(sc._jsc.hadoopConfiguration())
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jvm.java.net.URI(rootURL),conf)


    g=sc._jvm.org.apache.hadoop.fs.Path.mergePaths(sc._jvm.org.apache.hadoop.fs.Path(rootURL),  sc._jvm.org.apache.hadoop.fs.Path("/" + globp))

    hadooputil.get.globPath(fs,g)

Любая помощь приветствуется.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...