Кафка потоков, создание KTable из заданной темы - PullRequest
0 голосов
/ 27 ноября 2018

Я делаю проект и застрял на KTable.

Я хочу взять записи из темы и поместить их в KTable (хранилище), чтобы у меня была 1 запись на 1ключ.

    static KafkaStreams streams;

    final Serde<Long> longSerde = Serdes.Long();
    final Serde<byte[]> byteSerde = Serdes.ByteArray();
    static String topicName;
    static String storeName;
    final StreamsBuilder builder = new StreamsBuilder();

    KStream<Long, byte[]> streamed = builder.stream(topicName, Consumed.with(longSerde, byteSerde));
    KTable<Long, byte[]> records = streamed.groupByKey().reduce(
            new Reducer<Long>() {
                @Override
                public Long apply(Long aggValue, Long newValue) {
                    return newValue;
                }
            }, 
            storeName);

Это самый близкий мне ответ, я думаю.

1 Ответ

0 голосов
/ 27 ноября 2018

Ваш подход правильный, но вам нужно использовать правильные serdes.

В функции .reduce () тип значения должен быть byte[].

 KStream<Long, byte[]> streamed = builder.stream(topicName, Consumed.with(longSerde, byteSerde));
 KTable<Long, byte[]> records = streamed.groupByKey().reduce(
            new Reducer<byte[]>() {
                @Override
                public byte[] apply(byte[] aggValue, byte[] newValue) {
                    return newValue;
                }
            }, 
            Materialized.as(storename).with(longSerde,byteSerde));
...