Как составить список файлов в озере данных azure, используя spark из pycharm (локальная среда IDE), который подключен с помощью databricks-connect - PullRequest
0 голосов
/ 09 февраля 2020

Я работаю над кодом на моей локальной машине на pycharm. Выполнение выполняется в кластере блоков данных, в то время как данные хранятся в azure datalake.

В принципе, мне нужно перечислить файлы в каталоге azure datalake и затем применить некоторые логи чтения c для файлов, для этого я использую приведенный ниже код

sc = spark.sparkContext
hadoop = sc._jvm.org.apache.hadoop

fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()

path = hadoop.fs.Path('adl://<Account>.azuredatalakestore.net/<path>')
for f in fs.get(conf).listStatus(path):
    print(f.getPath(), f.getLen())

, приведенный выше код прекрасно работает на ноутбуках с базами данных, но когда я пытаюсь запустить тот же код через pycharm с помощью databricks-connect, я получаю следующее ошибка.

"Wrong FS expected: file:///....."

при некотором копании выясняется, что код ищет на моем локальном диске поиск "пути". У меня была похожая проблема с python библиотеками (os, pathlib)

У меня нет проблем с запуском другого кода в кластере.

Нужна помощь в выяснении, как выполнить это так, чтобы искать данные, а не мою локальную машину.

Кроме того, клиент azure -datalake-store не доступен из-за определенных ограничений.

1 Ответ

0 голосов
/ 22 февраля 2020

Вы можете использовать это.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
import java.net.URI

def listFiles(basep: String, globp: String): Seq[String] = {
  val conf = new Configuration(sc.hadoopConfiguration)
  val fs = FileSystem.get(new URI(basep), conf)

  def validated(path: String): Path = {
    if(path startsWith "/") new Path(path)
    else new Path("/" + path)
  }

  val fileCatalog = InMemoryFileIndex.bulkListLeafFiles(
    paths = SparkHadoopUtil.get.globPath(fs, Path.mergePaths(validated(basep), validated(globp))),
    hadoopConf = conf,
    filter = null,
    sparkSession = spark)

  fileCatalog.flatMap(_._2.map(_.path))
}

val root = "/mnt/{path to your file directory}"
val globp = "[^_]*"

val files = listFiles(root, globp)
files.toDF("path").show()
...