Конвертировать искровой фрейм данных в json файлы, которые содержат массив json - PullRequest
0 голосов
/ 03 марта 2020

Я пишу Spark Application в scala, который читает таблицу HiveTable и сохраняет вывод в HDFS как Json файл формата.

Я читаю таблицу кустов с помощью HiveContext, и она возвращает DataFrame. Ниже приведен фрагмент кода.

val sparkConf = new SparkConf().setAppName("SparkReadHive")
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)

import sqlContext.implicits._

    val df = sqlContext.sql(
      """
        |SELECT *
        |FROM database.table
        |""".stripMargin)

df.write.format("json").save(path)

Мне нужен выходной файл, который выглядит следующим образом:

[{"name":"tom", "age": 8},
{"name":"Jerry", "age": 7}]

Однако то, что я получаю, выглядит ниже:

{"name":"tom", "age": 8}
{"name":"Jerry", "age": 7}

Может кто-нибудь, пожалуйста, помогите мне с этим? Спасибо!

1 Ответ

0 голосов
/ 03 марта 2020

Мы можем использовать .toJSON, collect() and .mkString метод для получения массива json объектов и с помощью файловой системы oop для создания файла в формате hdf с желаемым форматом.

Example:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.io._ 
import java.io._

//sample dataframe
val df=sc.parallelize(Seq(("tom",8),("Jerry",7))).toDF("name","age")

//making array of json object
val data=df.toJSON.collect().mkString("[",",\n","]")

//filesystem object
val path = new Path("hdfs://<namenode>:8020/<path>/myfile.txt")
val conf = new Configuration(sc.hadoopConfiguration)
val fs = path.getFileSystem(conf)
if (fs.exists(path))
    fs.delete(path, true)
val out = new BufferedOutputStream(fs.create(path))
out.write(data.getBytes("UTF-8"))
out.flush()
out.close()
fs.close()

Check contents of file in HDFS:

hadoop fs -cat myfile.txt
[{"name":"tom","age":8},
{"name":"Jerry","age":7}]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...