Сопоставьте [String, java.lang.Object] с проблемой схемы DataFrame - PullRequest
0 голосов
/ 06 ноября 2018

Я должен определить схему из значений (не ключей) Map[String, Object].

Пример карты:

val myMap = Map("k1" -> 1, "k2" -> "", "k3"->  new Timestamp(new Date().getTime), "k4" -> 2.0 )

В настоящее время я создал схему из ключей, как показано ниже:

// I have created a schema using keys
val schema = StructType(myMap.keys.toSeq.map {
  StructField(_, StringType) // StringType is wrong since Object in the Map can be of any datatype
}

// I have created a RDD like below
val rdd = sc.parallelize(Seq(Row.fromSeq(myMap.values.toSeq)))
val df = sc.createDataFrame(rdd,schema)

Но теперь моя проблема в том, что объект может быть двойным, или датой, или отметкой времени, или чем-то еще. Но я создал схему, используя StringType, как описано выше, что неправильно.

Есть ли идеи создания схемы из значений Map, которые являются объектами?

1 Ответ

0 голосов
/ 08 ноября 2018

Ссылки: это идея из dataTypeFor ScalaReflection из искровой код

Вы можете создать структуру, подобную этой

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}

 /**
    *createStruct based on datatype
    * @param myObject Object
    * @return [[DataType]]
    */
  def createStruct(myObject: Object): DataType = {

    myObject match {
      case t if t.isInstanceOf[String] => StringType
      case t if t.isInstanceOf[Long] => LongType
      case t if t.isInstanceOf[Integer] => IntegerType
      case t if t.isInstanceOf[Float] => FloatType
      case t if t.isInstanceOf[Double] => DoubleType
      case t if t.isInstanceOf[java.sql.Timestamp] => TimestampType
    }
  }

Ниже приведен пример фрагмента, который вызывает вышеуказанную функцию.

val a: Seq[(Object, Object)] = myMap.keys.toList.zip(columnsMap.values.toList)
    logger.info("" + a.toString)

    val list = ListBuffer.empty[StructField]

    a.foreach { x => {
      list += StructField(x._1.toString, createStruct(x._2), false)
      //println(createStruct(x._2) + "--" + x.toString())
    }
      //   )
    }
    println("list is " + list)
    val schema = StructType(list.toList)
    println("-----" + schema.treeString)
    val df = sparkSession.sqlContext.createDataFrame(rdd, schema)
    df.printSchema()
    df.show
...