Как преобразовать CSV в паркетный файл внутри HDFS - PullRequest
0 голосов
/ 29 апреля 2020

Я новичок в больших данных, поэтому oop и hdfs немного исчезли для меня, поэтому я прошу помощи. Теперь у меня есть 4 файла в формате csv, которые находятся в кластере HDFS, и я должен сделать 4 копии в формате PARQUET, используя Python, и я не представляю, как это сделать. Я надеюсь, что вы можете помочь мне с этим не сложным вопросом.

1 Ответ

0 голосов
/ 29 апреля 2020

Я поместил ваш пример в Scala код, но сделать это в Python - почти то же самое. Я также положил некоторые комментарии с некоторыми объяснениями

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object ReadCsv {
  val spark = SparkSession
    .builder()
    .appName("ReadCsv")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","ReadCsv") // To silence Metrics warning
    .getOrCreate()

  val sqlContext = spark.sqlContext

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {

      val df = sqlContext
        .read
        .csv("/path/directory_to_csv_files/") // Here we read the .csv files
        .cache()

      df.repartition(4) // we get four files
          .write
          .parquet("/path/directory_to_parquet_files/") // output format file.parquet.snappy by default
      // if we want parquet uncompressed before write we have to do:
      // sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      spark.stop()
      println("SparkSession stopped")
    }
  }
}
...