Хранить сумму в KTable <String, Integer> - PullRequest
1 голос
/ 26 июня 2019

Я пытаюсь посчитать количество конвертов из темы.Сделка в авро-формате.Я использую этот пример в качестве ссылки.

final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, Transaction> transactionKStream = streamsBuilder.stream(INPUT_TOPIC);

final KStream<String, Integer> envelopes = transactionKStream.filter((k, v) -> v.getProduct().toString()
    .matches("C4|C5"))
    .map((k, v) -> KeyValue.pair("1", v.getAmount()));

final KTable<String, Integer> amount = envelopes
    .groupByKey()
    .reduce((v1, v2) -> v1 + v2);

Я хочу сохранить сумму в KTable <>, но когда я отправляю данные в тему ввода, потребитель вылетает с

A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Integer). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

Когда KTable <> закомментирован, он работает нормально.Но не суммируйте сумму.

1 Ответ

2 голосов
/ 26 июня 2019

groupByKey() использует сериализаторы по умолчанию:

groupByKey()

Сгруппируйте записи по их текущему ключу в KGroupedStream при сохранении исходных значений и значений по умолчанию сериализаторы и десериализаторы.

Вы должны использовать groupByKey(Serialized<K,V> serialized) или groupByKey(Grouped<K,V> grouped).

Следующее должно сделать трюк:

final KTable<String, Integer> amount = envelopes
    .groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()))
    .reduce((v1, v2) -> v1 + v2);
...