Исключительная ситуация при десериализации потока байтов в объект класса случая Scala - PullRequest
1 голос
/ 30 мая 2019

Я пытаюсь десериализовать поток байтов avro в объект класса случая scala. По сути, у меня был поток kafka с потоком данных, закодированных в avro, и теперь есть дополнение к схеме, и я пытаюсь обновить класс случая scala для включения нового поля. Класс case выглядит следующим образом

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String] = None
               )  {

this () = this ("na", "na", "na", 0, None) }

Схема avro выглядит следующим образом:

{
  "type": "record",
  "name": "some_name",
  "namespace": "some_namespace",
  "fields": [
    {
      "name": "deviceId",
      "type": "string"
    },
    {
      "name": "sw_version",
      "type": "string"
    }, 
    {
      "name": "timestamp",
      "type": "string"
    },
    {
      "name": "reading",
      "type": "double"
    },
    {
      "name": "new_field",
     "type": ["null", "string"],
      "default": null
    }]}

Когда данные получены, я получаю следующее исключение:

java.lang.RuntimeException: java.lang.InstantiationException

Я могу получать данные, просто отлично, для потребителя, написанного на python, так что я знаю, что данные передаются в правильном формате в правильном формате. Я подозреваю, что проблема заключается в создании конструктора класса case, я пытался сделать это:

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String]
               )  {
this() = this("na", "na", "na", 0, some("na"))
}

но не повезло.

Код десериализатора (выдержки):

// reader and decoder for reading avro records
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)

Я не смог найти других примеров наличия конструкторов для классов дел, которые используются для десериализации avro, в прошлом году я написал соответствующий вопрос java.lang.NoSuchMethodException для метода init в классе дел Scala и основываясь на ответе, я смог реализовать свой текущий код, который с тех пор работал нормально.

1 Ответ

1 голос
/ 01 июня 2019

Я решил эту проблему, следуя совершенно другому подходу. Я использовал клиент Confluent Kafka, как показано в этом примере https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink. У меня также есть реестр схемы Confluent, который очень легко настроить, используя контейнеризованное все в одном решении, которое поставляется с kafka и реестром схемы https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html.

Мне пришлось добавить конфлюентные зависимости и репо в мой файл pom.xml. Это идет в разделе хранилища.

<repository>
    <id>confluent</id>
    <url>http://packages.confluent.io/maven/</url>
</repository>

Это относится к разделу зависимости:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-confluent-registry</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <!-- For Confluent Platform 5.2.1 -->
    <version>5.2.1</version>
</dependency>

С помощью кода, предоставленного в https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala, я смог поговорить с реестром схемы Confluent, а затем на основе идентификатора схемы в заголовке сообщения avro это загружает схему из reg схемы и возвращает мне объект GenericRecord из которого я могу легко получить любые поля интереса и создать новый DataStream объекта DeviceData.

val kafka_consumer = new FlinkKafkaConsumer010("prod.perfwarden.minute",
  new ConfluentRegistryDeserializationSchema[GenericRecord](classOf[GenericRecord], "http://localhost:8081"),
  properties)
val device_data_stream = env
  .addSource(kafka_consumer)
  .map({x => new DeviceData(x.get("deviceId").toString,
    x.get("sw_version").toString,
    x.get("timestamp").toString,
    x.get("reading").toString.toDouble,
    x.get("new_field").toString)})

Конфлюентный клиент kafka заботится о десериализации потока байтов avro согласно схеме, включая значения по умолчанию. Настройка реестра схемы и использование слитного клиента kafka может занять немного времени, чтобы привыкнуть, но, вероятно, это лучшее долгосрочное решение, только мои 2 цента.

...