Я немного новичок в Spark и Scala. У меня есть (большой ~ 1 миллион) Scala Spark DataFrame, и мне нужно сделать его строкой json.
схема DF, как это
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
|--valKey(String)
|--vslScore(Double)
ключ - это идентификатор продукта, а значение - это некоторый набор продуктов, а это значения баллов, которые я получаю из файла паркета.
Мне только удается получить что-то подобное. Для фигурных скобок я просто объединяю их в результат.
3434343<tab>{smartphones:apple:0.4564879,smartphones:samsung:0.723643 }
Но я ожидаю, что значение, подобное этому. Каждое значение должно иметь
3434343<tab>{"smartphones:apple":0.4564879, "smartphones:samsung":0.723643 }
В любом случае, я напрямую конвертирую это в строку Json, не объединяя ничего. Я надеюсь записать выходные файлы в формате .csv
. Это код, который я использую
val df = parquetReaderDF.withColumn("key",col("productId"))
.withColumn("value", struct(
col("productType"),
col("brand"),
col("score")))
.select("key","value")
val df2 = df.withColumn("valKey", concat(
col("productType"),lit(":")
,col("brand"),lit(":"),
col("score")))
.groupBy("key")
.agg(collect_list(col("valKey")))
.map{ r =>
val key = r.getAs[String]("key")
val value = r.getAs[Seq[String]] ("collect_list(valKey)").mkString(",")
(key,value)
}
.toDF("key", "valKey")
.withColumn("valKey", concat(lit("{"), col("valKey"), lit("}")))
df.coalesce(1)
.write.mode(SaveMode.Overwrite)
.format("com.databricks.spark.csv")
.option("delimiter", "\t")
.option("header", "false")
.option("quoteMode", "yes")
.save("data.csv")