Не найдено кодировщика для вложенного класса Java - PullRequest
0 голосов
/ 17 марта 2020

Я создал класс Scala следующим образом:

case class MyObjectWithEventTime(value: MyObject, eventTime: Timestamp)

MyObject - это объект Java.

Я пытаюсь использовать его в своей структуре Spark следующим образом Потоковое задание:

implicit val myObjectEncoder: Encoder[MyObject] = Encoders.bean(classOf[MyObject])

val withEventTime = mystream
 .select(from_json(col("value").cast("string"), schema).alias("value"))
 .withColumn("eventTime", to_timestamp(col("value.timeArrived")))
 .as[MyObjectWithEventTime]
 .groupByKey(row => {... some code here
 })
 .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
 .filter(col("id").isNotNull)
 .toJSON
 .writeStream
 .format("kafka")
 .option("checkpointLocation", "/tmp")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("topic", conf.KafkaProperties.outputTopic)
 .option("checkpointLocation", "/tmo/checkpointLocation")
 .outputMode("update")
 .start()
 .awaitTermination()

Но я получаю эту ошибку ...

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.xxx.MyObject
- field (class: "com.xxx.MyObject", name: "value")
- root class: "com.xxx.MyObjectWithEventTime"

1 Ответ

0 голосов
/ 17 марта 2020

попытайтесь определить кодировщик для MyObjectWithEventTime и используйте метод Encoders.javaSerialization[T]:

implicit val myObjectEncoder: Encoder[MyObject] = Encoders.javaSerialization[MyObject]
implicit val myObjectWithEventEncoder: Encoder[MyObjectWithEventTime] = Encoders.javaSerialization[MyObjectWithEventTime]

Помните, ваш java класс MyObject должен реализовывать Serializable и реализовал методы получения и установки publi c для всех полей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...