Вы можете использовать udf для анализа Json и преобразования его в массивы структур.
Сначала определите функцию, которая анализирует Json (на основе this ответ):
case class Data(E:String, V:Double)
class CC[T] extends Serializable { def unapply(a: Any): Option[T] = Some(a.asInstanceOf[T]) }
object M extends CC[Map[String, Any]]
object L extends CC[List[Any]]
object S extends CC[String]
object D extends CC[Double]
def toStruct(in: String): Array[Data] = {
if( in == null || in.isEmpty) return new Array[Data](0)
val result = for {
Some(L(map)) <- List(JSON.parseFull(in))
M(data) <- map
S(e) = data("E")
D(v) = data("V")
} yield {
Data(e, v)
}
result.toArray
}
Эта функция возвращает массив Data
объектов, которые уже имеют правильную структуру.Теперь мы используем эту функцию для определения udf
val ts: String => Array[Data] = toStruct(_)
import org.apache.spark.sql.functions.udf
val toStructUdf = udf(ts)
Наконец, мы вызываем udf (например, в операторе select
):
val df = ...
val newdf = df.select('VIN, 'TT, 'MSG_TYPE, toStructUdf('ABS).as("ABS"), toStructUdf('ALT).as("ALT"))
newdf.printSchema()
Вывод:
root
|-- VIN: string (nullable = true)
|-- TT: string (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- ABS: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: string (nullable = true)
| | |-- V: double (nullable = false)
|-- ALT: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: string (nullable = true)
| | |-- V: double (nullable = false)