Как распаковать столбец в DataFrame Spark с помощью Spark Scala - PullRequest
0 голосов
/ 16 апреля 2019

У меня есть паркетные файлы со столбцом, содержащим сжатый контент.В настоящее время моя работа Spark (написана на Scala) использует цепочку Java.io Reader для упорядочения содержимого:

val output: StringBuilder = new StringBuilder
val byteArrayInputStream: ByteArrayInputStream = new ByteArrayInputStream(x)
try {
  val gzipInputStream: GZIPInputStream = new GZIPInputStream(byteArrayInputStream)
  try {
    val inputStreamReader: InputStreamReader = new InputStreamReader(gzipInputStream, StandardCharsets.UTF_8)
    try {
      val bufferedReader: BufferedReader = new BufferedReader(inputStreamReader)
      try {
        var line: String = null
        do {
          line = bufferedReader.readLine()
          if (line != null)
            output.append(line)
        } while (line != null)

      } finally {
        if (bufferedReader != null) {
          bufferedReader.close()
        }
      }
    }
    finally {
      if (inputStreamReader != null) {
        inputStreamReader.close()
      }
    }
  }
  finally {
    if (gzipInputStream != null) {
      gzipInputStream.close()
    }
  }
}
finally {
  if (byteArrayInputStream != null) {
    byteArrayInputStream.close()
  }
}
val out = output.toString
return out

Но это приводит к исключению java.lang.OutOfMemoryError: GC overhead limit exceeded в кластере Hadoop.

Существуют ли более эффективные методы для разархивирования содержимого?

1 Ответ

0 голосов
/ 16 апреля 2019

Вы можете определить искровой UDF (пользовательскую функцию), который распаковывает байтовый массив gzip:

  1. определяет UDF, который получает байтовый массив и возвращает строку
    static UDF1 unzip = (UDF1<byte[], String>) YourClass::gzipDecompress;
зарегистрировать этот UDF
    spark.sqlContext().udf().register("unzip", unzip, DataTypes.StringType);
попросите спарк вычислить столбец с использованием UDF
    df.withColumn("unzipped_column", callUDF("unzip", col("your_original_column_with_gzip_data")))

Вы можете получить выгоду, взглянув на другую аналогичную реализацию декомпрессии gzip в scala, игнорируя сбои:

def decompress(compressed: Array[Byte]): String = {
    val inputStream = new GZIPInputStream(new ByteArrayInputStream(compressed))
    scala.io.Source.fromInputStream(inputStream).mkString
}

источник: https://github.com/rest-assured/rest-assured/blob/master/examples/scalatra-example/src/main/scala/io/restassured/scalatra/support/Gzip.scala

Примечание: пример UDF написан на java, но должен быть очень похожим на scala, см. https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/functions.html#callUDF-java.lang.String-org.apache.spark.sql.Column...-

...