Kafka Streams StreamsException, когда я работаю с несколькими потоками - PullRequest
1 голос
/ 05 февраля 2020

Я работаю с Kafka Streams & Kotlin над созданием сервиса, в котором есть потоки по трем темам. Первый из них имеет значение Avro, а два других имеют строковые значения.

В моем файле properties в качестве значения по умолчанию Serde используется SpecificAvroSerde, а затем я использую Consumed.with(Serdes.String(), Serdes.String()) для использования строки значения.

    val topicOneStream = streamsBuilder.stream<String, AvroObject>(topicOne)
        .peek { k, _ -> logger.info("Received message with key: $k") }
        .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }

    val topicTwoStream = streamsBuilder
        .stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
        .peek { k, _ -> logger.info("Received message with key: $k") }
        .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }

    val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
        .peek { k, _ -> logger.info("Received message with key: $k") }
        .mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }
        .flatMapValues { v -> listOf(v) }.groupByKey().reduce { v1, _ -> v1 }

Когда у меня есть конфигурация следующего потока в качестве значений по умолчанию, я вижу, что поток Avro (первый) работает нормально и потребляет то, что я публикую на этой топи c. Но я получаю исключение, когда публикую sh в потоках строковых значений, используя ту же конфигурацию.

default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

Вот исключение из публикации в теме два и теме три:

org.apache.kafka.streams.errors.StreamsException: A serializer (io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer) is not compatible to the actual value type (value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

PS. Это должно быть три потока в одном сервисе, потому что позже будет соединение.

1 Ответ

0 голосов
/ 12 февраля 2020

Благодаря другу (Марио Бойкову), проблема возникает, когда Кафка делает группировку для получения нового KTable. Он не знает, какой сериализатор взять для группировки, поэтому он принимает значение Serde по умолчанию, которое в моем случае SpecificAvroSerde

Это было решено путем предоставления groupByKey сериализатора, необходимого для группировки :

val topicTwoStream = streamsBuilder
    .stream<String, String>(topicTwo, Consumed.with(Serdes.String(), Serdes.String()))
    .peek { k, _ -> logger.info("Received message with key: $k") }
    .flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 }

val topicThreeStream = streamsBuilder.stream<String, String>(topicThree, Consumed.with(Serdes.String(), Serdes.String()))
    .peek { k, _ -> logger.info("Received message with key: $k") }
    .flatMapValues { v -> listOf(v) }.groupByKey(Grouped.with(Serdes.String(), Serdes.String())).reduce { v1, _ -> v1 }
    .mapValues { v -> objectMapper.readValue(v, AdviceCreated::class.java) }

Приветствия ?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...