Чтение tar-файла из sftp и конвертирование его в gzip на hdfs на лету - PullRequest
0 голосов
/ 25 февраля 2019

Я пытаюсь прочитать tar-файл из SFTP-местоположения и посредством манипуляции с потоком распаковать его, а затем снова сжать его в gz в экосистеме hadoop.Я сжал простой файл с текстовым сообщением

Текст в сжатом файле.

Имя файла testDataTar.txt.tar и имя текстового файла внутри него:testDataTar.txt.Ниже приведен фрагмент кода, который выполняет чтение и преобразование:

def copyFilesToHdfs(sourceFile: FileObject, fs: FileSystem, targetPath: Path): String = {
   val fileName = sourceFile.getName.getBaseName.replaceAll("tar$", "gz")
   val fis = sourceFile.getContent.getInputStream    
   val is = new TarArchiveInputStream(fis)
   val targetFile = new Path(targetPath, fileName)

   val codec = new GzipCodec
   codec.setConf(new org.apache.hadoop.conf.Configuration())
   val os = new DataOutputStream(codec.createOutputStream(fs.create(targetFile)))

   IOUtils.copy(is, os)

   is.close()
   os.close()
   fis.close()

   fileName
}

Если при чтении в результирующем файле выполнить следующее:

val fileContent = Config.sparkContext.textFile(targetFile.toString).first()

Содержимое файла будетпусто (также проверялось вручную), и я получаю исключение следующим образом.

java.lang.UnsupportedOperationException: empty collection
20:22:42   at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1369)
20:22:42   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
20:22:42   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
20:22:42   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
20:22:42   at org.apache.spark.rdd.RDD.first(RDD.scala:1366)

Однако, если я просто делаю то же самое, но с fis, пропуская строку с TarArchiveInputStream и кормяСам текстовый файл, я получаю прекрасно сжатый файл gzip в качестве вывода на hdfs.

Итак, в общем, я почему-то не могу передать содержимое файла между потоками .tar и .gz.Я пробовал то же самое для .zip (ZipInputStream) в .gz, и результат был таким же.Любая помощь приветствуется.

...