Kafka Ktable также передает дубликаты обновлений - PullRequest
1 голос
/ 21 апреля 2020

Kafka Ktable также передает дубликаты обновлений.

Я хочу обработать поток изменений Ktable (созданный с помощью Kstream.reduce ()), то есть любое изменение значения ключей в Ktable. Но кажется, что даже когда одна и та же пара значений ключа отправляется в Ktable несколько раз, она отправляется в нисходящем направлении каждый раз. Мне нужно отправить обновление в значении для ключа, только если значение изменится.

`

groupByKey(Grouped.with(new Serdes.LongSerde(),new Serdes.LongSerde())) 
                .reduce(new Reducer<Long>() {   
                    @Override
                    public Long apply(Long t1, Long t2) {
                        return t2;
                    }
                }).toStream().foreach((key, value) -> //for each update in ID, send update to the stream
        {

            sendUpdate(key); 
        });

`

1 Ответ

1 голос
/ 21 апреля 2020

Это поведение по умолчанию KTable#toStream(), оно преобразует список изменений topi c в KStream, поэтому нижестоящий оператор reduce обновляется каждый раз, когда восходящий оператор сокращения получает сообщение.

Вы можете заархивировать свое поведение по желанию, используя API процессора , в этом случае мы используем KStream.transfomerValues ​​().

Сначала зарегистрируйте KeyValueStore, чтобы сохранить ваше последнее значение:

//you don't need to add number_store, if your KTable already materialized to number_store
streamsBuilder
        .addStateStore(Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("number_store"), Serdes.Long(), Serdes.Long()));

numberKStream
        .transformValues(ExtractIfValueChangedTransformer::new, "number_store")
        .filter((key, value) -> value != null)
        .foreach((key, value) -> sendUpdate(key));

Затем мы создаем ExtractIfValueChangedTransformer, возвращаемое значение нового сообщения, только если значение изменилось, если нет, то возвращаем нуль:

public class ExtractIfValueChangedTransformer implements ValueTransformerWithKey<Long, Long, Long> {

    KeyValueStore<Long, Long> kvStore;

    @Override
    public void init(ProcessorContext context) {
        kvStore = (KeyValueStore<Long, Long>) context.getStateStore("number_store");
    }

    @Override
    public Long transform(Long key, Long newValue) {
        Long oldValue = kvStore.get(key);
        kvStore.put(key, newValue);
        if (oldValue == null) return newValue;
        return oldValue.equals(newValue) ? null : newValue;
    }

    @Override
    public void close() {}
}
...