Я пытаюсь создать универсальный AvroSerde в Scala.Я буду использовать этот serde в сочетании с Flink , поэтому этот serde также должен быть сериализуемым сам.Avro не имеет встроенной поддержки Scala, однако есть некоторые библиотеки, которые позволяют преобразовывать классы дел в обычные записи, используя бесформенную форму.Примечание: этот универсальный сериализатор будет создаваться только с классами case.
Сначала я попытался реализовать этот serde, используя Avro4s .Я скомпилировал это довольно легко, убедившись, что универсальный тип был привязан к контексту FromRecord
и RecordFrom
, однако и FromRecord
и RecordFrom
не сериализуемы, поэтому я не могу использовать этот serde во Flink.
В настоящее время я пытаюсь использовать другую библиотеку shapeless-datatype , которая также использует бесформенную форму.Мой текущий код выглядит следующим образом:
class Serializer[T : TypeTag : ClassTag] {
//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
//Get Avro Type
val avroType = AvroType[T]
def serialize(value : T) : Array[Byte] = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
var writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)
val genericRecord = avroType.toGenericRecord(value)
writer.write(genericRecord, encoder)
encoder.flush()
out.close()
out.toByteArray
}
def deserialize(message: Array[Byte]) : T = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(message, null)
avroType.fromGenericRecord(datumReader.read(null, decoder)).get
}
}
Так что в основном я создаю AvroType[T]
, который имеет два метода fromGenericRecord
и toGenericRecord
( source ).Эти методы требуют некоторых последствий: LabelledGeneric.Aux[A, L]
, ToAvroRecord[L]
, tt: TypeTag[A]
и fromL: FromAvroRecord[L]
.
В настоящее время этот код выдает ошибки компиляции из-за отсутствия этих имплицитов:
Error:(48, 51) could not find implicit value for parameter gen: shapeless.LabelledGeneric.Aux[T,L]
val genericRecord = avroType.toGenericRecord(value)
Простая перегрузка имплицитов из методов toGenericRecord
и fromGenericRecord
не решает проблему, потому что тогда мне нужнопараметризовать serialize[L <: Hlist]
и deserialize[L <: Hlist]
, что я не могу сделать, потому что Flink не позволяет этим методам иметь типы.
У меня мало опыта как с бесформенным, так и с имплицитным пониманием того, какие границы контекста мне нужно, чтобы решить эту проблему, и в то же время поддерживать сериализацию этого класса.
Надеюсь, кто-то может помочь или указать мне на некоторые полезные ресурсы.
Спасибо, Wouter
РЕДАКТИРОВАТЬ
Я не могупередавать имплициты через методы и не делать их параметризованными, поскольку мне нужно основать serde на интерфейсах сериализации Flink, что заставляет меня переопределять: byte[] serialize(T element)
и T deserialize(byte[] message)
Если я пытаюсь передать неявное значение всам класс, мне нужно изменить его на:
class Serializer[T : TypeTag : ClassTag, L <: HList](implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L], fromL: FromAvroRecord[L])
, но затем, если я создаю его экземпляр следующим образом:
case class Test(str: String)
val serializer = new Serializer[Test]
Я получаю эту ошибку компиляции:
Error:(29, 26) wrong number of type arguments for shapeless.datatype.avro.Serializer, should be 2
val serializer = new Serializer[Test]