Класс серде на заказ JSON (Kotlin & Кафка) - PullRequest
0 голосов
/ 04 апреля 2020

Я пишу потоковое приложение kafka в Kotlin, которое потребляет сообщение JSON (без AVRO или реестра схемы).

В MyMessage.kt я объявил класс MyMessage как @Serializable.

MyMessage.kt

import kotlinx.serialization.Serializable

@Serializable
data class MyMessage(val from: String, val to: String, val msg: String)

Streaming.kt

val s: KString<String, MyMessage> = streamsBuilder()
    .stream("my", Consumed.with(Serdes.String, Serdes.serdeFrom(MyMessage::class.java)

При выполнении этого я получаю следующую ошибку в строке выше:

Exception in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID

Чего мне не хватает?

1 Ответ

1 голос
/ 04 апреля 2020

Аргумент для Serdes.serdesFrom() ожидает объект Serializer и Deserializer (оба интерфейса являются интерфейсами Kafka из пакета org.apache.kafka.common.serialization и не имеют ничего общего с аннотацией @Serializable.

Вам нужно создать классы MyMessageSerializer extends Serializer и MyMessageDeserialzer extends Deserializer и передать эти объекты в метод. Для реализации фактической (де) сериализации с обоими классами вы можете использовать сериализацию по умолчанию, если хотите.

...