Приложение Kafka Streams - подсчет и суммирование - PullRequest
0 голосов
/ 28 февраля 2019

Я пытаюсь создать KTable из KGroupedStream для хранения суммы значения для каждого ключа.

 final StreamsBuilder builder = new StreamsBuilder();
 final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
            .groupByKey()
            .aggregate(new Initializer<Long>() {
                @Override
                public Long apply() {
                    return Long.MIN_VALUE;
                }
            }, new Aggregator<String, Long, Long>() {
                @Override
                public Long apply(final String key, final Long value,final Long aggregate) {
                    aggregate += value;
                    return aggregate;
                }
            }, Materialized.<String, Long, KeyValueStore<Byte, byte[]>>as("counts-store"));

, но получаю ошибку:

The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Materialized<String,Long,KeyValueStore<Byte,byte[]>>)

Всепримеры, которые я видел, передают Serde в качестве третьего аргумента, но я попробовал это и получил очень похожую ошибку (я думаю, что это может быть из более старой версии, так как она не совпадает с сигнатурой текущей реализации?):

final StreamsBuilder builder = new StreamsBuilder();
    final KTable<String, Long> sum = builder.stream("streams-plaintext-input")
            .groupByKey()
            .aggregate(new Initializer<Long>() {
                @Override
                public Long apply() {
                    return Long.MIN_VALUE;
                }
            }, new Aggregator<String, Long, Long>() {
                @Override
                public Long apply(final String key, final Long value,final Long aggregate) {
                    aggregate += value;
                    return aggregate;
                }
            }, Serdes.Long());

Ошибка:

The method aggregate(Initializer<VR>, Aggregator<? super Object,? super Object,VR>, Materialized<Object,VR,KeyValueStore<Bytes,byte[]>>) in the type KGroupedStream<Object,Object> is not applicable for the arguments (new Initializer<Long>(){}, new Aggregator<String,Long,Long>(){}, Serde<Long>)

Что я делаю не так?

Использование версии Kafka: 2.1.0

1 Ответ

0 голосов
/ 28 февраля 2019

В вашем коде есть несколько проблем:

  1. Для Materialized.as вместо java.lang.Byte вы должны передать org.apache.kafka.common.utils.Bytes
  2. Вы не должны изменять final переменную:aggregate += value;
  3. Вы должны добавить типы ключей и значений к StreamsBuilder::stream вызову (builder.<String, Long>stream("streams-plaintext-input"))

После модификации он должен выглядеть примерно так:

KTable<String, Long> sum = builder.<String, Long>stream("streams-plaintext-input")
        .groupByKey()
        .aggregate(new Initializer<Long>() {
            @Override
            public Long apply() {
                return Long.MIN_VALUE;
            }
        }, new Aggregator<String, Long, Long>() {
            @Override
            public Long apply(final String key, final Long value,final Long aggregate) {
                return aggregate + value;
            }
        }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...