Преобразование Case-классов с параметрами в качестве Case-классов в Avro Сообщение для отправки в Kafka - PullRequest
4 голосов
/ 16 мая 2019

Я использую класс case, который имеет вложенные классы case, и Seq[Nested Case Classes] Проблема заключается в том, что когда я пытаюсь сериализовать его, используя KafkaAvroSerializer, он выдает:

Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema(AbstractKafkaAvroSerDe.java:115)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:879)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:728)```

1 Ответ

2 голосов
/ 17 мая 2019

Если вы хотите использовать Avro с такими конструкциями Scala, как классы case, я рекомендую использовать Avro4s .Он имеет встроенную поддержку всех функций Scala и может даже создать схему из вашей модели, если вы этого хотите.

Есть некоторые ошибки с автоматическим выводом классов типов.Это то, что я узнал.

Используйте по крайней мере avro4s версии 2.0.4

Некоторые макросы генерируют код с предупреждениями компилятора, а также устраняют проблемы с удалением бородавок.Нам пришлось добавить следующие аннотации, чтобы заставить наш код компилироваться (иногда ошибка не может быть найдена неявно, но это вызвано ошибкой в ​​сгенерированном макросом коде):

@com.github.ghik.silencer.silent
@SuppressWarnings(Array("org.wartremover.warts.Null", "org.wartremover.warts.AsInstanceOf", "org.wartremover.warts.StringPlusAny"))

Следующая автоматическая деривация класса типов работает только однауровень за один раз.Я создал объект для хранения всех моих экземпляров SchemaFor, Decoder и Encoder для моей схемы.Затем я создал экземпляры классов типов, явно начиная с большинства внутренних типов.Я также использовал implicitly, чтобы убедиться, что каждый ADT разрешится, прежде чем перейти к следующему.Например:

sealed trait Notification
object Notification {
  final case class Outstanding(attempts: Int) extends Notification
  final case class Complete(attemts: Int, completedAt: Instant) extends Notification
}

sealed trait Job
final case class EnqueuedJob(id: String, enqueuedAt: Instant) extends Job
final case class RunningJob(id: String, enqueuedAt: Instant, startedAt: Instant) extends Job
final case class FinishedJob(id: String, enqueuedAt: Instant, startedAt: Instant, completedAt: Instant) extends Job

object Schema {

  // Explicitly define schema for ADT instances
  implicit val schemaForNotificationComplete: SchemaFor[Notification.Complete] = SchemaFor.applyMacro
  implicit val schemaForNotificationOutstanding: SchemaFor[Notification.Outstanding] = SchemaFor.applyMacro

  // Verify Notification ADT is defined
  implicitly[SchemaFor[Notification]]
  implicitly[Decoder[Notification]]
  implicitly[Encoder[Notification]]

  // Explicitly define schema, decoder and encoder for ADT instances
  implicit val schemaForEnqueuedJob: SchemaFor[EnqueuedJob] = SchemaFor.applyMacro
  implicit val decodeEnqueuedJob: Decoder[EnqueuedJob] = Decoder.applyMacro
  implicit val encodeEnqueuedJob: Encoder[EnqueuedJob] = Encoder.applyMacro

  implicit val schemaForRunningJob: SchemaFor[RunningJob] = SchemaFor.applyMacro
  implicit val decodeRunningJob: Decoder[RunningJob] = Decoder.applyMacro
  implicit val encodeRunningJob: Encoder[RunningJob] = Encoder.applyMacro

  implicit val schemaForFinishedJob: SchemaFor[FinishedJob] = SchemaFor.applyMacro
  implicit val decodeFinishedJob: Decoder[FinishedJob] = Decoder.applyMacro
  implicit val encodeFinishedJob: Encoder[FinishedJob] = Encoder.applyMacro

  // Verify Notification ADT is defined
  implicitly[Encoder[Job]]
  implicitly[Decoder[Job]]
  implicitly[SchemaFor[Job]]

  // And so on until complete nested ADT is defined
}
...