Агрегат KStream с пользовательским типом значения и известным хранилищем состояний - PullRequest
0 голосов
/ 12 июня 2019

Я пытаюсь выполнить агрегацию в потоке с ключом типа String и значением пользовательского типа - следующим образом

stream.groupByKey(Grouped.with(Serdes.String(),barSerde))
      .windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
      .aggregate(Foo::new,
                 (String key, Bar bar, Foo foo) -> {
                   foo.updateMap(bar.getKey(), bar.getValue());
                   return foo;
                 }, 
                 Materialized.with(Serdes.String(), fooSerde));

Мне удалось получить требуемые результаты, пока я не начал задавать хранилище состоянийв Materialized работает следующим образом

stream.groupByKey(Grouped.with(Serdes.String(),barSerde))
      .windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
      .aggregate(Foo::new,
                 (String key, Bar bar, Foo foo) -> {
                   foo.updateMap(bar.getKey(), bar.getValue());
                   return foo;
                 }, 
                 Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(storeTopic))
                   .withKeySerde(Serdes.String())
                   .withValueSerde(fooSerde));

Я получаю ошибку компиляции

Error:(122, 17) java: no suitable method found for aggregate(Foo::new,(key,daBea[...]an; },org.apache.kafka.streams.kstream.Materialized<java.lang.String,com.test.bean.Foo,org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>)
method org.apache.kafka.streams.kstream.TimeWindowedKStream.<VR>aggregate(org.apache.kafka.streams.kstream.Initializer<VR>,org.apache.kafka.streams.kstream.Aggregator<? super java.lang.String,? super com.test.bean.Bar,VR>) is not applicable
  (cannot infer type-variable(s) VR
    (actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.TimeWindowedKStream.<VR>aggregate(org.apache.kafka.streams.kstream.Initializer<VR>,org.apache.kafka.streams.kstream.Aggregator<? super java.lang.String,? super com.test.bean.Bar,VR>,org.apache.kafka.streams.kstream.Materialized<java.lang.String,VR,org.apache.kafka.streams.state.WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>>) is not applicable
  (cannot infer type-variable(s) VR
    (argument mismatch; org.apache.kafka.streams.kstream.Materialized<java.lang.String,com.test.bean.Foo,org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> cannot be converted to org.apache.kafka.streams.kstream.Materialized<java.lang.String,VR,org.apache.kafka.streams.state.WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>>))

Как указать и Serdes, и раздел stateStore с помощью Materialized?

1 Ответ

0 голосов
/ 12 июня 2019

Поскольку вы выполняете агрегацию в окнах, ожидаемый тип хранилища - не KeyValueStore, а WindowStore.

...