Может ли spark игнорировать нечитаемые файлы? - PullRequest
0 голосов
/ 14 декабря 2018

У меня есть файловая структура в следующей форме

s3://<bucket>/year=2018/month=11/day=26/hour=10/department=x/part-xxxxx.gz.parquet

Мои учетные данные AWS НЕ имеют доступа ко всем значениям department=, только к нескольким.

Я пытаюсь выполнить

df = spark.read.parquet("s3://<bucket>/year=2018/") 

И это не с

java.io.IOException: Could not read footer: java.io.IOException: Could not read footer for file FileStatus{path=s3://<bucket>/year=2018/month=11/day=26/hour=10/department=yyyyyy/part-xxxxx.gz.parquet; isDirectory=false; length=104448; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}

Сбой, очевидно, потому что я могу получить доступ только к department=x, а не department=yyyy, мой вопрос: есть ли способ просто молча игнорировать их?

Мой текущий обходной путь - построить только допустимые пути , например:

   tmpl = 's3://<bucket>/year=2018/month=11/day=26/hour={hour}/department=x/'
   df = spark.read.parquet(*list(tmpl.format(hour=hour) for hour in range(1,24)))

, что очень громоздко , поскольку

  1. некоторые пути могут не существовать (отсутствуют данные в течение нескольких часов и т. Д.).
  2. spark.read.parquet нетвзять список или генератор в качестве входных данных, поэтому я вынужден использовать оператор splat / unpack, чтобы преобразовать все в отдельные аргументы.Не уверен, что это работает нормально с тысячами путей.

Есть ли лучший способ загрузить эти данные без изменения файловой структуры (которую я не контролирую)?

1 Ответ

0 голосов
/ 15 декабря 2018

Сбой, очевидно, потому что я могу получить доступ только к отделу = x, а не к отделу = гггг, мой вопрос: есть ли способ просто молча игнорировать их?

согласно состоянию файлаобъект, который вы имеете право на чтение и запись (permission=rw-rw-rw-) в файл, указанный в журнале.Возможно, другая проблема связана с путем к файлу.

java.io.IOException: Не удалось прочитать нижний колонтитул: java.io.IOException: Не удалось прочитать нижний колонтитул для файла FileStatus {path = s3: ///год = 2018 / месяц = ​​11 / день = 26 / час = 10 / отдел = YYYYYY / неполный xxxxx.gz.parquet;isDirectory = ложь;длина = 104448;Репликация = 0;размер_блок = 0;MODIFICATION_TIME = 0;access_time = 0;владелец =;группа =;разрешение = RW-RW-rw-;isSymlink = false}

Из вышеприведенной ошибки вы передаете объект fileStatus в неверном формате строки.

вам нужно передать fileStat.getPath.toString, т.е. в вашем случае path=s3:///year=2018/month=11/day=26/hour=10/department=yyyyyy/part-xxxxx.gz.parquet;

df = spark.read.parquet (...) будет работать.

если вы хотите пропустить папку с файлами паркета, вы можете пропустить.

или вы хотите фильтрнекоторые файлы ниже примера кода фрагмент кода вы можете использовать

/**
    * getAllFilePath.
    *
    * @param filePath Path
    * @param fs       FileSystem
    * @return list of absolute file path present in given path
    * @throws FileNotFoundException
    * @throws IOException
    */
  @throws[FileNotFoundException]
  @throws[IOException]
  def getAllFilePath(filePath: Path, fs: FileSystem): ListBuffer[String] = {
    val fileList = new ListBuffer[String]
    val fileStatus = fs.listStatus(filePath)
    for (fileStat <- fileStatus) {
      logInfo(s"file path Name : ${fileStat.getPath.toString} length is  ${fileStat.getLen}")
      if (fileStat.isDirectory) fileList ++= (getAllFilePath(fileStat.getPath, fs))
      else if (fileStat.getLen > 0 && !fileStat.getPath.toString.isEmpty) {
        logInfo("fileStat.getPath.toString" + fileStat.getPath.toString)
        fileList.foreach(println)
        fileList += fileStat.getPath.toString
      } else if (fileStat.getLen == 0) {
        logInfo(" length zero files \n " + fileStat)

        // fs.rename(fileStat.getPath, new Path(fileStat.getPath+"1"))
      }
    }
    fileList
  }

, как в этом примере

    val fs = FileSystem.get(new URI(inputPath), spark.sparkContext.hadoopConfiguration)
 yourFiles = getAllFilePath(new Path(inputPath), fs)

val df = spark.read.parquet(yourFiles:_*)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...