Следующий простой код читает JSON string
и создает набор данных, когда MyClass
является case class
, он работает, но когда MyClass
является non-case class
, он терпит неудачу, я думаю, что мы можем сделать скодировщик, но по какой-то причине это меня обманывает.
Настройка
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types._
val schema = StructType(List(StructField("KAFKA_ID", StringType, true),StructField("KAFKA_TS", StringType, true)))
Рабочая демонстрация класса Case
// ******* Working code for CASE - Class ***********
case class MyClass( KAFKA_ID: String, KAFKA_TS: String )
val jsonData1 = """{"KAFKA_ID": "1", "KAFKA_TS" : "T1"}"""
val jsonData2 = """{"KAFKA_ID": "2", "KAFKA_TS" : "T2"}"""
val res = spark.sqlContext.read.schema(schema).json(spark.sparkContext.parallelize(Seq(jsonData1, jsonData2))).as[MyClass]
res.show(10, false)
// ******* Working code for CASE - Class ***********
Неработающий код для Non-Case.Я видел поток переполнения стека, но не мог иметь никакого смысла из этого.
// ******* Not Working code for NON-CASE - Class ***********
class MyClass{
var _KAFKA_ID: String
var _KAFKA_TS: String
def KAFKA_ID_=(value:String):Unit=_KAFKA_ID = value
def KAFKA_ID=_KAFKA_ID
def KAFKA_TS_=(value:String):Unit=_KAFKA_TS = value
def KAFKA_TS=_KAFKA_TS
}
implicit val myClassEncoder = org.apache.spark.sql.Encoders.kryo[MyClass]
//val myClassEncoder = Encoders.bean(MyClass)
val jsonData1 = """{"KAFKA_ID": "1", "KAFKA_TS" : "T1"}"""
val jsonData2 = """{"KAFKA_ID": "2", "KAFKA_TS" : "T2"}"""
val res = spark.sqlContext.read.schema(schema).json(spark.sparkContext.parallelize(Seq(jsonData1, jsonData2))).as[MyClass]
res.show(10, false)
// ******* Not Working code for NON-CASE - Class ***********
Получение ошибки, как показано ниже
notebook:12: error: class MyClass needs to be abstract, since:
it has 2 unimplemented members.
/** As seen from class MyClass, the missing signatures are as follows.
* For convenience, these are usable as stub implementations.
*/
def _KAFKA_ID_=(x$1: String): Unit = ???
def _KAFKA_TS_=(x$1: String): Unit = ???
class MyClass{