Я пытаюсь посчитать количество конвертов из темы.Сделка в авро-формате.Я использую этот пример в качестве ссылки.
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, Transaction> transactionKStream = streamsBuilder.stream(INPUT_TOPIC);
final KStream<String, Integer> envelopes = transactionKStream.filter((k, v) -> v.getProduct().toString()
.matches("C4|C5"))
.map((k, v) -> KeyValue.pair("1", v.getAmount()));
final KTable<String, Integer> amount = envelopes
.groupByKey()
.reduce((v1, v2) -> v1 + v2);
Я хочу сохранить сумму в KTable <>, но когда я отправляю данные в тему ввода, потребитель вылетает с
A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Integer). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
Когда KTable <> закомментирован, он работает нормально.Но не суммируйте сумму.