Метод withValueSerde () в типе Materialized <> не применим - PullRequest
0 голосов
/ 26 июня 2018

Я группирую поток по ключу и пытаюсь агрегировать значения по сгруппированному ключу. Я следую за streams-developer-guide

Я получаю сообщение об ошибке withValueSerde. Там написано:

The method withValueSerde(Serde<Object>) in the type Materialized<Object,Object,StateStore> is not applicable for the arguments (Serde<Long>)

Код:

KStream<String, String> inputStream = builder.stream("input_topic");
KStream<String, Integer> transformedStream = inputStream.map(
        (key, value) ->  KeyValue.pair(getKey(value), getValue(value)));

KGroupedStream<String, Integer> groupedStream = transformedStream.groupByKey();

KTable<String, Long> aggregatedStream = groupedStream.aggregate(() -> 0L,
        (aggKey, newValue, aggValue) -> aggValue + newValue,
        Materialized.as("aggregated-stream-store").withValueSerde(Serdes.Long()));

1 Ответ

0 голосов
/ 26 июня 2018

Вам необходимо указать универсальные типы. Java не может определить их автоматически (если вы посмотрите на сообщение об ошибке, оно просто говорит Materialized<Object,Object,StateStore>, указывая неизвестные типы):

Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-stream-store")
    .withValueSerde(Serdes.Long())
...