Пара rdd сохранить в файл parque scala - PullRequest
0 голосов
/ 24 апреля 2020

У меня есть RDD [Map [String, String]], мне нужно преобразовать его в формат данных, чтобы я мог сохранить данные в файле паркета, где ключами карты является имя столбца.

Например:

         Map("city" -> "delhi", "ip" -> "42.1.15.102", "source" -> "PlayStore","createdDate"->"2020-04-21"),
          Map("city" -> "", "ip" -> "42.06.15.102", "source" -> "PlayStore","createdDate"->"2020-04-22")))```

Output:
**City** | **ip**
Delhi    | 1.234


1 Ответ

1 голос
/ 24 апреля 2020

Там я приведу некоторые рекомендации по решению вашей проблемы

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object MapToDfParquet {

  val spark = SparkSession
    .builder()
    .appName("MapToDfParquet")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","MapToDfParquet") // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val sqlContext = spark.sqlContext

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {

      import spark.implicits._

      val data = Seq(Map("city" -> "delhi", "ip" -> "42.1.15.102", "source" -> "PlayStore","createdDate"->"2020-04-21"),
                     Map("city" -> "", "ip" -> "42.06.15.102", "source" -> "PlayStore","createdDate"->"2020-04-22"))
        .map( seq => seq.values.mkString(","))

      val df = sc.parallelize(data)
        .map(str => str.split(","))
        .map(arr => (arr(0),arr(1),arr(2),arr(3)))
        .toDF("city", "ip","source","createdDate")

      df.show(truncate = false)

      // by default writes it will write as parquet with snappy compression
      // we change this behavior and save as parquet uncompressed
      sqlContext.setConf("spark.sql.parquet.compression.codec","uncompressed")

      df
        .write
        .parquet("hdfs://quickstart.cloudera/user/cloudera/parquet")

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

ожидаемый результат

+-----+------------+---------+-----------+
|city |ip          |source   |createdDate|
+-----+------------+---------+-----------+
|delhi|42.1.15.102 |PlayStore|2020-04-21 |
|     |42.06.15.102|PlayStore|2020-04-22 |
+-----+------------+---------+-----------+

Надеюсь, это может быть полезно, С уважением.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...