Преобразовать фрейм данных в строку json в Spark - PullRequest
0 голосов
/ 03 мая 2019

Я немного новичок в Spark и Scala. У меня есть (большой ~ 1 миллион) Scala Spark DataFrame, и мне нужно сделать его строкой json. схема DF, как это

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
        |--valKey(String)
        |--vslScore(Double) 

ключ - это идентификатор продукта, а значение - это некоторый набор продуктов, а это значения баллов, которые я получаю из файла паркета. Мне только удается получить что-то подобное. Для фигурных скобок я просто объединяю их в результат.

3434343<tab>{smartphones:apple:0.4564879,smartphones:samsung:0.723643 }

Но я ожидаю, что значение, подобное этому. Каждое значение должно иметь

3434343<tab>{"smartphones:apple":0.4564879, "smartphones:samsung":0.723643 }

В любом случае, я напрямую конвертирую это в строку Json, не объединяя ничего. Я надеюсь записать выходные файлы в формате .csv. Это код, который я использую

 val df = parquetReaderDF.withColumn("key",col("productId"))         
       .withColumn("value", struct(
         col("productType"),
         col("brand"),
         col("score")))
       .select("key","value")
val df2 = df.withColumn("valKey", concat(
  col("productType"),lit(":")
  ,col("brand"),lit(":"),

  col("score")))
  .groupBy("key")
  .agg(collect_list(col("valKey")))
  .map{ r =>
    val key = r.getAs[String]("key")
    val value = r.getAs[Seq[String]]  ("collect_list(valKey)").mkString(",")

    (key,value)
  }
  .toDF("key", "valKey")
  .withColumn("valKey", concat(lit("{"), col("valKey"),  lit("}")))


  df.coalesce(1)
       .write.mode(SaveMode.Overwrite)
       .format("com.databricks.spark.csv")
       .option("delimiter", "\t")
       .option("header", "false")
       .option("quoteMode", "yes")
       .save("data.csv")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...