Я пытаюсь прочитать tar-файл из SFTP-местоположения и посредством манипуляции с потоком распаковать его, а затем снова сжать его в gz в экосистеме hadoop.Я сжал простой файл с текстовым сообщением
Текст в сжатом файле.
Имя файла testDataTar.txt.tar
и имя текстового файла внутри него:testDataTar.txt
.Ниже приведен фрагмент кода, который выполняет чтение и преобразование:
def copyFilesToHdfs(sourceFile: FileObject, fs: FileSystem, targetPath: Path): String = {
val fileName = sourceFile.getName.getBaseName.replaceAll("tar$", "gz")
val fis = sourceFile.getContent.getInputStream
val is = new TarArchiveInputStream(fis)
val targetFile = new Path(targetPath, fileName)
val codec = new GzipCodec
codec.setConf(new org.apache.hadoop.conf.Configuration())
val os = new DataOutputStream(codec.createOutputStream(fs.create(targetFile)))
IOUtils.copy(is, os)
is.close()
os.close()
fis.close()
fileName
}
Если при чтении в результирующем файле выполнить следующее:
val fileContent = Config.sparkContext.textFile(targetFile.toString).first()
Содержимое файла будетпусто (также проверялось вручную), и я получаю исключение следующим образом.
java.lang.UnsupportedOperationException: empty collection
20:22:42 at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1369)
20:22:42 at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
20:22:42 at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
20:22:42 at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
20:22:42 at org.apache.spark.rdd.RDD.first(RDD.scala:1366)
Однако, если я просто делаю то же самое, но с fis
, пропуская строку с TarArchiveInputStream
и кормяСам текстовый файл, я получаю прекрасно сжатый файл gzip в качестве вывода на hdfs.
Итак, в общем, я почему-то не могу передать содержимое файла между потоками .tar и .gz.Я пробовал то же самое для .zip (ZipInputStream
) в .gz, и результат был таким же.Любая помощь приветствуется.