приведенный ниже код решит вашу проблему, я тестировал это в spark 3.0.0 / scala 2.12.10.
import org.apache.spark.sql.functions._
val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") ).toDF("p_id", "p_meta")
df_input.show()
/*
+----+----------------+
|p_id| p_meta|
+----+----------------+
| p1|{"a": 1, "b": 2}|
| p2| {"c": 3}|
+----+----------------+
*/
//UDF to convert JSON to MAP
def convert(str:String):Map[String,String]={
"(\\w+): (\\w+)".r.findAllIn(str).matchData.map(i => {
(i.group(1), i.group(2))
}).toMap
}
val udfConvert=spark.udf.register("udfConvert",convert _)
//Remove double quotes
val df=df_input.withColumn("p_meta", regexp_replace($"p_meta", "\"", ""))
df.show()
/*
+----+------------+
|p_id| p_meta|
+----+------------+
| p1|{a: 1, b: 2}|
| p2| {c: 3}|
+----+------------+
*/
val df1=df.withColumn("new_col",udfConvert($"p_meta"))
/*
+----+------------+----------------+
|p_id| p_meta| new_col|
+----+------------+----------------+
| p1|{a: 1, b: 2}|[a -> 1, b -> 2]|
| p2| {c: 3}| [c -> 3]|
+----+------------+----------------+
*/
df1.select($"p_id",$"p_meta",$"new_col",explode($"new_col")).drop($"p_meta").drop($"new_col").withColumn("p_meta_key",$"key").withColumn("p_mata_value",$"value").drop($"key").drop($"value").show()
/*
+----+----------+------------+
|p_id|p_meta_key|p_mata_value|
+----+----------+------------+
| p1| a| 1|
| p1| b| 2|
| p2| c| 3|
+----+----------+------------+
*/