Преобразование схемы из String в Array [Structype] с использованием Spark Scala - PullRequest
1 голос
/ 28 сентября 2019

У меня есть пример данных, как показано ниже, мне нужно будет преобразовать столбцы (ABS, ALT) из строки в Array [structType], используя искровой скала-код.Любая помощь приветствуется.

С помощью UDF я смог преобразовать строку в arrayType, но мне нужна помощь по преобразованию строки в массив [structType] для этих двух столбцов (ABS, ALT).

VIN         TT  MSG_TYPE ABS                           ALT
MSGXXXXXXXX 1   SIGL     [{"E":1569XXXXXXX,"V":0.0}] 
[{"E":156957XXXXXX,"V":0.0}]

df.currentSchema 
root
|-- VIN: string (nullable = true)
|-- TT: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- ABS: string (nullable = true)
|-- ALT: string (nullable = true)

df.expectedSchema:

|-- VIN: string (nullable = true)
|-- TT: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- ABS: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: long (nullable = true)
|-- ALT: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: double (nullable = true)

1 Ответ

1 голос
/ 28 сентября 2019

Вы можете использовать udf для анализа Json и преобразования его в массивы структур.

Сначала определите функцию, которая анализирует Json (на основе this ответ):

case class Data(E:String, V:Double)
class CC[T] extends Serializable { def unapply(a: Any): Option[T] = Some(a.asInstanceOf[T]) }
object M extends CC[Map[String, Any]]
object L extends CC[List[Any]]
object S extends CC[String]
object D extends CC[Double]

def toStruct(in: String): Array[Data] = {
  if( in == null || in.isEmpty) return new Array[Data](0)
  val result = for {
    Some(L(map)) <- List(JSON.parseFull(in))
    M(data) <- map
    S(e) = data("E")
    D(v) = data("V")
  } yield {
    Data(e, v)
  }
  result.toArray
}

Эта функция возвращает массив Data объектов, которые уже имеют правильную структуру.Теперь мы используем эту функцию для определения udf

val ts: String => Array[Data] = toStruct(_)
import org.apache.spark.sql.functions.udf
val toStructUdf = udf(ts)

Наконец, мы вызываем udf (например, в операторе select):

val df = ...
val newdf = df.select('VIN, 'TT, 'MSG_TYPE, toStructUdf('ABS).as("ABS"), toStructUdf('ALT).as("ALT"))
newdf.printSchema()

Вывод:

root
 |-- VIN: string (nullable = true)
 |-- TT: string (nullable = true)
 |-- MSG_TYPE: string (nullable = true)
 |-- ABS: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: string (nullable = true)
 |    |    |-- V: double (nullable = false)
 |-- ALT: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- E: string (nullable = true)
 |    |    |-- V: double (nullable = false)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...