Ошибка файлов Apache Commons CompressorInputStream и HDFS Gzip. Не найден компрессор для подписи потока. - PullRequest
0 голосов
/ 27 апреля 2019

При написании программного обеспечения для чтения файлов из файловой системы HDFS разных типов (например, некоторые из них являются архивами tar, некоторые являются простым текстом, некоторые являются двоичными файлами, а некоторые - сжатием gzip или другим способом), я обнаружил, что могу использовать Метод CompressorStreamFactory.detect для правильной идентификации сжатых файлов, как и ожидалось, при попытке создать BufferedReader для чтения сжатых данных построчно, появляется ошибка.

Я пробовал следующее:

val hdfsConf: Configuration = new Configuration()
hdfsConf.set("fs.hdfs.impl",classOf[DistributedFileSystem].getName)
hdfsConf.set("fs.file.impl",classOf[LocalFileSystem].getName)
hdfsConf.set(FileSystem.DEFAULT_FS,"hdfs://a-namenode-that-exists:8020")

val fs: FileSystem = FileSystem.get(new URI("hdfs://a-namenode-that-exists:8020"),hdfsConf)

def getFiles(directory: Path): Unit = {
    val iter: RemoteIterator[LocatedFileStatus] = fs.listFiles(directory,true)

    var c: Int = 0

    while(iter.hasNext && c < 3) {
        val fileStatus: LocatedFileStatus = iter.next()
        val path: Path = fileStatus.getPath
        val hfs: FileSystem = path.getFileSystem(hdfsConf)
        val is: FSDataInputStream = hfs.open(path)

        val t: String = CompressorStreamFactory.detect(new BufferedInputStream(is))
        System.out.println(s"|||||||||| $t |=| ${path.toString}")

        val reader: BufferedReader = new BufferedReader(new InputStreamReader(new CompressorStreamFactory().createCompressorInputStream(new BufferedInputStream(is))))

        var cc: Int = 0
        while(cc < 10) {
            val line: String = reader.readLine()
            System.out.println(s"|||||||||| $line")
            cc += 1
        }
        c += 1
    }
}

getFiles(new Path("/some/directory/that/is/definitely/there/"))


Я ожидал, что, поскольку я смог успешно использовать метод CompressorStreamFactory.detect для правильной идентификации моего файла как gzip, чтение файла также должно работать. Результирующий класс, возвращаемый методом открытия hdfs, называется FSDataInputStream, который является производным от InputStream (и FilteredInputStream), поэтому я ожидал, что, поскольку я широко использовал библиотеку Apache Commons Compress для чтения архивных и сжатых файлов из обычной файловой системы Linux, и я могу определить используемое сжатие, чтобы оно работало нормально и с HDFS ... но, увы, я получаю сообщение об ошибке:

Exception in thread "main" org.apache.commons.compress.compressors.CompressorException: No Compressor found for the stream signature.
    at org.apache.commons.compress.compressors.CompressorStreamFactory.detect(CompressorStreamFactory.java:525)
    at org.apache.commons.compress.compressors.CompressorStreamFactory.createCompressorInputStream(CompressorStreamFactory.java:542)
    at myorg.myproject.scratch.HdfsTest$.getFiles$1(HdfsTest.scala:34)
    at myorg.myproject.scratch.HdfsTest$.main(HdfsTest.scala:47)
    at myorg.myproject.scratch.HdfsTest.main(HdfsTest.scala)

Я довольно привязан к библиотеке Apache Commons, потому что ее фабричные методы уменьшают сложность моего кода для чтения архивных и сжатых файлов (не только tar или gzip). Я надеюсь, что есть простое объяснение того, почему обнаружение работает, а чтение - нет ... но у меня закончились идеи, как это понять. Единственная мысль, которая у меня возникла, заключается в том, что происхождение FilteredInputStream объекта FSDataInputStream может испортить ситуацию ... но я понятия не имею, как это можно исправить, если бы это была настоящая проблема.

1 Ответ

0 голосов
/ 27 апреля 2019

После вызова CompressorStreamFactory.detect(new BufferedInputStream(is)) inputtream is уже частично используется. Таким образом, это выглядит искаженным CompressorStreamFactory().createCompressorInputStream. Попробуйте открыть файл вместо повторного использования is.

...