Поскольку я работаю со значениями Json, я не установил значения по умолчанию.
Я обрабатываю KStream, потребляя его с необходимыми пружинами и сортами продукта (json), но следующий шаг (операция отображения) завершается неудачно:
val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = applicationName
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = kafkaBootstrapServers
val productSerde: Serde<Product> = Serdes.serdeFrom(JsonPojoSerializer<Product>(), JsonPojoDeserializer(Product::class.java))
builder.stream(INVENTORY_TOPIC, Consumed.with(Serdes.String(), productSerde))
.map { key, value ->
KeyValue(key, XXX)
}
.aggregate(...)
Если я удаляю операцию карты, выполнение идет нормально.
Я не нашел способа указать serdes для карты (), как это сделать?
Ошибка:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.codependent.kafkastreams.inventory.dto.Product). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)