Apache Spark создает набор данных с классом Non-Case - PullRequest
0 голосов
/ 05 октября 2018

Следующий простой код читает JSON string и создает набор данных, когда MyClass является case class, он работает, но когда MyClass является non-case class, он терпит неудачу, я думаю, что мы можем сделать скодировщик, но по какой-то причине это меня обманывает.

Настройка

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types._

val schema = StructType(List(StructField("KAFKA_ID", StringType, true),StructField("KAFKA_TS", StringType, true)))

Рабочая демонстрация класса Case

// ******* Working code for CASE - Class ***********
case class MyClass( KAFKA_ID: String, KAFKA_TS: String )
val jsonData1 = """{"KAFKA_ID": "1", "KAFKA_TS" : "T1"}"""
val jsonData2 = """{"KAFKA_ID": "2", "KAFKA_TS" : "T2"}"""
val res = spark.sqlContext.read.schema(schema).json(spark.sparkContext.parallelize(Seq(jsonData1, jsonData2))).as[MyClass]
res.show(10, false)
// ******* Working code for CASE - Class ***********

Неработающий код для Non-Case.Я видел поток переполнения стека, но не мог иметь никакого смысла из этого.

// ******* Not Working code for NON-CASE - Class ***********

class MyClass{
  var _KAFKA_ID: String
  var _KAFKA_TS: String
  def KAFKA_ID_=(value:String):Unit=_KAFKA_ID = value
  def KAFKA_ID=_KAFKA_ID
  def KAFKA_TS_=(value:String):Unit=_KAFKA_TS = value
  def KAFKA_TS=_KAFKA_TS
} 

implicit val myClassEncoder = org.apache.spark.sql.Encoders.kryo[MyClass]
//val myClassEncoder = Encoders.bean(MyClass)
val jsonData1 = """{"KAFKA_ID": "1", "KAFKA_TS" : "T1"}"""
val jsonData2 = """{"KAFKA_ID": "2", "KAFKA_TS" : "T2"}"""
val res = spark.sqlContext.read.schema(schema).json(spark.sparkContext.parallelize(Seq(jsonData1, jsonData2))).as[MyClass]
res.show(10, false)
// ******* Not Working code for NON-CASE - Class ***********

Получение ошибки, как показано ниже

notebook:12: error: class MyClass needs to be abstract, since:
it has 2 unimplemented members.
/** As seen from class MyClass, the missing signatures are as follows.
 *  For convenience, these are usable as stub implementations.
 */
  def _KAFKA_ID_=(x$1: String): Unit = ???
  def _KAFKA_TS_=(x$1: String): Unit = ???

class MyClass{
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...