Универсальный Avro Serde с использованием бесформенного типа данных - PullRequest
0 голосов
/ 25 мая 2018

Я пытаюсь создать универсальный AvroSerde в Scala.Я буду использовать этот serde в сочетании с Flink , поэтому этот serde также должен быть сериализуемым сам.Avro не имеет встроенной поддержки Scala, однако есть некоторые библиотеки, которые позволяют преобразовывать классы дел в обычные записи, используя бесформенную форму.Примечание: этот универсальный сериализатор будет создаваться только с классами case.

Сначала я попытался реализовать этот serde, используя Avro4s .Я скомпилировал это довольно легко, убедившись, что универсальный тип был привязан к контексту FromRecord и RecordFrom, однако и FromRecord и RecordFrom не сериализуемы, поэтому я не могу использовать этот serde во Flink.

В настоящее время я пытаюсь использовать другую библиотеку shapeless-datatype , которая также использует бесформенную форму.Мой текущий код выглядит следующим образом:

class Serializer[T : TypeTag : ClassTag] {

  //Get type of the class at run time
  val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]

  //Get Avro Type
  val avroType = AvroType[T]

  def serialize(value : T) : Array[Byte] = {
    var schema: Schema = null

    if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
      schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
    } else {
      schema = ReflectData.get().getSchema(inputClassType)
    }

    val out: ByteArrayOutputStream = new ByteArrayOutputStream()
    val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    var writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)

    val genericRecord = avroType.toGenericRecord(value)

    writer.write(genericRecord, encoder)
    encoder.flush()
    out.close()

    out.toByteArray
  }

  def deserialize(message: Array[Byte]) : T = {
    var schema: Schema = null

    if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
      schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
    } else {
      schema = ReflectData.get().getSchema(inputClassType)
    }

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val decoder = DecoderFactory.get().binaryDecoder(message, null)

    avroType.fromGenericRecord(datumReader.read(null, decoder)).get
  }


}

Так что в основном я создаю AvroType[T], который имеет два метода fromGenericRecord и toGenericRecord ( source ).Эти методы требуют некоторых последствий: LabelledGeneric.Aux[A, L], ToAvroRecord[L], tt: TypeTag[A] и fromL: FromAvroRecord[L].

В настоящее время этот код выдает ошибки компиляции из-за отсутствия этих имплицитов:

Error:(48, 51) could not find implicit value for parameter gen: shapeless.LabelledGeneric.Aux[T,L]
  val genericRecord = avroType.toGenericRecord(value)

Простая перегрузка имплицитов из методов toGenericRecord и fromGenericRecord не решает проблему, потому что тогда мне нужнопараметризовать serialize[L <: Hlist] и deserialize[L <: Hlist], что я не могу сделать, потому что Flink не позволяет этим методам иметь типы.

У меня мало опыта как с бесформенным, так и с имплицитным пониманием того, какие границы контекста мне нужно, чтобы решить эту проблему, и в то же время поддерживать сериализацию этого класса.

Надеюсь, кто-то может помочь или указать мне на некоторые полезные ресурсы.

Спасибо, Wouter

РЕДАКТИРОВАТЬ

Я не могупередавать имплициты через методы и не делать их параметризованными, поскольку мне нужно основать serde на интерфейсах сериализации Flink, что заставляет меня переопределять: byte[] serialize(T element) и T deserialize(byte[] message)

Если я пытаюсь передать неявное значение всам класс, мне нужно изменить его на:

class Serializer[T : TypeTag : ClassTag, L <: HList](implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L], fromL: FromAvroRecord[L])

, но затем, если я создаю его экземпляр следующим образом:

case class Test(str: String)
val serializer = new Serializer[Test]

Я получаю эту ошибку компиляции:

Error:(29, 26) wrong number of type arguments for shapeless.datatype.avro.Serializer, should be 2
val serializer = new Serializer[Test]

1 Ответ

0 голосов
/ 25 мая 2018

Вы должны сделать Serializer класс типа .(Кстати, использование var s без необходимости - плохая практика.)

import java.io.ByteArrayOutputStream
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{BinaryEncoder, DatumWriter, DecoderFactory, EncoderFactory}
import org.apache.avro.reflect.ReflectData
import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
import shapeless.datatype.avro.{AvroType, FromAvroRecord, ToAvroRecord}
import shapeless.{HList, LabelledGeneric}  
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.{ClassTag, classTag}

trait Serializer[T] extends SerializationSchema[T] with DeserializationSchema[T] {
  type L <: HList
}

object Serializer {
  type Aux[T, L0 <: HList] = Serializer[T] { type L = L0 }

  def apply[T](implicit serializer: Serializer[T]): Serializer[T] = serializer

  implicit def mkSerializer[T : ClassTag : TypeTag, L0 <: HList](implicit
    gen: LabelledGeneric.Aux[T, L0],
    toL: ToAvroRecord[L0],
    fromL: FromAvroRecord[L0]): Aux[T, L0] =
    new Serializer[T] {
      type L = L0

      //Get type of the class at run time
      val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]

      //Get Avro Type
      val avroType = AvroType[T]

      override def serialize(value : T) : Array[Byte] = {
        val schema: Schema =
          if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
            inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
          else ReflectData.get().getSchema(inputClassType)

        val out: ByteArrayOutputStream = new ByteArrayOutputStream()
        val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
        val writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)

        val genericRecord = avroType.toGenericRecord(value)

        writer.write(genericRecord, encoder)
        encoder.flush()
        out.close()

        out.toByteArray
      }

      override def deserialize(message: Array[Byte]) : T = {
        val schema: Schema =
          if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
            inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
          else ReflectData.get().getSchema(inputClassType)

        val datumReader = new GenericDatumReader[GenericRecord](schema)
        val decoder = DecoderFactory.get().binaryDecoder(message, null)

        avroType.fromGenericRecord(datumReader.read(null, decoder)).get
      }

      override def isEndOfStream(nextElement: T): Boolean = ???

      override def getProducedType: TypeInformation[T] = ???
    }
}

case class Test(str: String)    
val serializer = Serializer[Test]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...