Я работаю с Kafka Streams & Kotlin над созданием сервиса, в котором есть потоки по трем темам. Первый из них имеет значение Avro, а два других имеют строковые значения.
В моем файле properties
в качестве значения по умолчанию Serde используется SpecificAvroSerde
, а затем я использую Consumed.with(Serdes.String(), Serdes.String())
для использования строки значения.
val topicOneStream = streamsBuilder.stream<String, AvroObject>(topicOne)
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
val topicTwoStream = streamsBuilder
.stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
.peek { k, _ -> logger.info("Received message with key: $k") }
.mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }
.flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }
Когда у меня есть конфигурация следующего потока в качестве значений по умолчанию, я вижу, что поток Avro (первый) работает нормально и потребляет то, что я публикую на этой топи c. Но я получаю исключение, когда публикую sh в потоках строковых значений, используя ту же конфигурацию.
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
Вот исключение из публикации в теме два и теме три:
org.apache.kafka.streams.errors.StreamsException: A serializer (io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer) is not compatible to the actual value type (value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
PS. Это должно быть три потока в одном сервисе, потому что позже будет соединение.