распаковать (распаковать / извлечь) утилиту с помощью Spark scala - PullRequest
0 голосов
/ 17 июня 2020

У меня есть customer_input_data.tar.gz в HDFS, в котором есть данные 10 различных таблиц в формате файла csv. поэтому мне нужно распаковать этот файл в / my / output / path с помощью spark scala

, пожалуйста, предложите, как распаковать файл customer_input_data.tar.gz с помощью spark scala

Ответы [ 2 ]

1 голос
/ 17 июня 2020

gzip не является форматом splittable в Had oop. Следовательно, на самом деле файл не будет распределяться по кластеру, и вы не получите никаких преимуществ от распределенных вычислений / обработки в has oop или Spark.

Лучше всего,

  • распакуйте файл в ОС, а затем по отдельности отправьте файлы обратно в has oop.

Если вы все еще хотите распаковать в scala, вы можете просто прибегнуть к java класс GZIPInputStream через

new GZIPInputStream(new FileInputStream("your file path"))
0 голосов
/ 29 июня 2020

Я разработал приведенный ниже код для распаковки файлов с использованием scala. Вам необходимо передать путь ввода и путь вывода и систему Hadoopfile

    /*below method used for processing zip files*/
  @throws[IOException]
  private def processTargz(fullpath: String, houtPath: String, fs: FileSystem): Unit = {
    val path = new Path(fullpath)
    val gzipIn = new GzipCompressorInputStream(fs.open(path))
    try {
      val tarIn = new TarArchiveInputStream(gzipIn)
      try {
        var entry:TarArchiveEntry = null
        out.println("Tar entry")
        out.println("Tar Name entry :" + FilenameUtils.getName(fullpath))
        val fileName1 = FilenameUtils.getName(fullpath)
        val tarNamesFolder = fileName1.substring(0, fileName1.indexOf('.'))
        out.println("Folder Name : " + tarNamesFolder)
        while ( {
          (entry = tarIn.getNextEntry.asInstanceOf[TarArchiveEntry]) != null
        }) { // entity Name as tsv file name which are part of inside compressed tar file
          out.println("ENTITY NAME : " + entry.getName)

          /** If the entry is a directory, create the directory. **/
          out.println("While")
          if (entry.isDirectory) {
            val f = new File(entry.getName)
            val created = f.mkdir
            out.println("mkdir")
            if (!created) {
              out.printf("Unable to create directory '%s', during extraction of archive contents.%n", f.getAbsolutePath)
              out.println("Absolute path")
            }
          }
          else {
            var count = 0
            val slash = "/"
            val targetPath = houtPath + slash + tarNamesFolder + slash + entry.getName
            val hdfswritepath = new Path(targetPath)
            val fos = fs.create(hdfswritepath, true)
            try {
              val dest = new BufferedOutputStream(fos, BUFFER_SIZE)
              try {
                val data = new Array[Byte](BUFFER_SIZE)
                while ( {
                  (count = tarIn.read(data, 0, BUFFER_SIZE)) != -1
                }) dest.write(data, 0, count)
              } finally if (dest != null) dest.close()
            }
          }
        }
        out.println("Untar completed successfully!")
      } catch {
        case e: IOException =>
          out.println("catch Block")
      } finally {
        out.println("FINAL Block")
        if (tarIn != null) tarIn.close()
      }
    }
  }
...