Проблема с десериализацией данных Avro в Scala - PullRequest
0 голосов
/ 01 июля 2018

Я создаю приложение Apache Flink в Scala, которое считывает потоковые данные с шины Kafka, а затем выполняет на ней операции суммирования. Данные из Kafka представлены в формате Avro и требуют специального класса десериализации. Я нашел этот класс Scala AvroDeserializationScehema (http://codegists.com/snippet/scala/avrodeserializationschemascala_saveveltri_scala):

package org.myorg.quickstart
import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import org.apache.avro.reflect.ReflectDatumReader
import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.common.serialization._
import java.io.IOException

class AvroDeserializationSchema[T](val avroType: Class[T]) extends DeserializationSchema[T] {
  private var reader: DatumReader[T] = null
  private var decoder : BinaryDecoder = null

  def deserialize(message: Array[Byte]): T = {
    ensureInitialized()
    try {
      decoder = DecoderFactory.get.binaryDecoder(message, decoder)
      reader.read(null.asInstanceOf[T], decoder)
    }
    catch {
      case e: IOException => {
        throw new RuntimeException(e)
      }
    }
  }

  def isEndOfStream(nextElement: T): Boolean = false


  def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(avroType)

  private def ensureInitialized() {
    if (reader == null) {
      if (classOf[SpecificRecordBase].isAssignableFrom(avroType)) {
        reader = new SpecificDatumReader[T](avroType)
      }
      else {
        reader = new ReflectDatumReader[T](avroType)
      }
    }
  }
}

В моем потоковом классе я использую это следующим образом:

val stream = env
        .addSource(new FlinkKafkaConsumer010[String]("test", new 
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))

где DeviceData - это класс случая Scala, определенный в том же проекте

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                    sw_version: String,
                    timestamp: String,
                    reading: Double
                   )

При компиляции класса StreamingKafkaClient.scala появляется следующая ошибка

Error:(24, 102) object java.lang.Class is not a value
        .addSource(new FlinkKafkaConsumer010[String]("test", new 
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))

Также пробовал

val stream = env
        .addSource(new FlinkKafkaConsumer010[String]("test", new 
AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))

При этом я получаю другую ошибку:

Error:(21, 20) overloaded method constructor FlinkKafkaConsumer010 with alternatives:
  (x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: java.util.regex.Pattern,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
  (x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]
 cannot be applied to (String, org.myorg.quickstart.AvroDeserializationSchema[org.myorg.quickstart.DeviceData], java.util.Properties)
        .addSource(new FlinkKafkaConsumer010[String]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))

Я совершенно новичок в Scala (это моя первая программа для Scala), поэтому я знаю, что мне здесь не хватает чего-то фундаментального. Когда я пытаюсь выучить Scala, кто-нибудь может указать, что я делаю не так. Мое намерение состоит в том, чтобы в основном считывать закодированные в Avro данные из Kafka во Flink и выполнять некоторые операции с потоковыми данными. Я не смог найти никаких примеров использования класса AvroDeserializationSchema, мне кажется, это то, что должно быть встроено в пакеты Flink.

1 Ответ

0 голосов
/ 02 июля 2018

Чтобы получить объект класса в Scala, вам нужно classOf[DeviceData], а не Class[DeviceData]

new AvroDeserializationSchema[DeviceData](classOf[DeviceData])

Я не смог найти никаких примеров использования класса AvroDeserializationSchema

Я нашел один (на Java)

Кроме того, похоже, что в выпуске Flink 1.6 они будут добавлять этот класс, а не копировать его из других источников. FLINK-9337 & FLINK-9338

Как уже упоминалось в комментариях, если вы хотите использовать Confluent Avro Schema Registry вместо указания типа класса, посмотрите этот ответ или обратитесь к коду в приведенной выше ссылке Github

Кроме того, если вы используете Kafka 0.11+ (или Confluent 3.3+), то в идеале вы должны использовать FlinkKafkaConsumer011 вместе с классом, который вы десериализуете до

new FlinkKafkaConsumer011[DeviceData]
...