Попытка определить сериализатор, используя avro4s, но получить пропущенную неявную ошибку - PullRequest
0 голосов
/ 07 мая 2019

Я использую клиент Flink (1.7) kafka и Avro4s (2.0.4), я хочу сериализовать в байтовый массив:

class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] {
  override def serialize(element: IN): Array[Byte] = {
    val str = AvroSchema[IN]
    val schema: Schema = new Parser().parse(str.toString)
    val out = new ByteArrayOutputStream()
    val os = AvroOutputStream.data[IN].to(out).build(schema)
    os.write(element)
    out.close()
    out.flush()
    os.flush()
    os.close()
    out.toByteArray
  }
}

Однако я получаю это исключение:

Error:(15, 35) could not find implicit value for evidence parameter of type com.sksamuel.avro4s.Encoder[IN]
    val os = AvroOutputStream.data[IN].to(out).build(schema)

и

Error:(15, 35) not enough arguments for method data: (implicit evidence$3: com.sksamuel.avro4s.Encoder[IN])com.sksamuel.avro4s.AvroOutputStreamBuilder[IN].
Unspecified value parameter evidence$3.
    val os = AvroOutputStream.data[IN].to(out).build(schema)

Ответы [ 2 ]

1 голос
/ 22 мая 2019

Вам не нужно использовать FromRecord при записи в выходной поток.Это для людей, которые хотят иметь GenericRecord для собственного использования.Вам нужно использовать Encoder.

class AvroSerializationSchema[IN : SchemaFor : Encoder] extends SerializationSchema[IN] {
  override def serialize(element: IN): Array[Byte] = {
    val str = AvroSchema[IN]
    val schema: Schema = new Parser().parse(str.toString)
    val out = new ByteArrayOutputStream()
    val os = AvroOutputStream.data[IN].to(out).build(schema)
    os.write(element)
    out.close()
    out.flush()
    os.flush()
    os.close()
    out.toByteArray
  }
}
1 голос
/ 07 мая 2019

Согласно коду IN должен быть Encoder тип:

object AvroOutputStream {

  /**
    * An [[AvroOutputStream]] that does not write the schema. Use this when
    * you want the smallest messages possible at the cost of not having the schema available
    * in the messages for downstream clients.
    */   def binary[T: Encoder] = new AvroOutputStreamBuilder[T](BinaryFormat)

  def json[T: Encoder] = new AvroOutputStreamBuilder[T](JsonFormat)

  def data[T: Encoder] = new AvroOutputStreamBuilder[T](DataFormat)
}

, поэтому он должен выглядеть примерно так:

class AvroSerializationSchema[IN : Encoder] ...
...