Это поведение по умолчанию KTable#toStream()
, оно преобразует список изменений topi c в KStream
, поэтому нижестоящий оператор reduce
обновляется каждый раз, когда восходящий оператор сокращения получает сообщение.
Вы можете заархивировать свое поведение по желанию, используя API процессора , в этом случае мы используем KStream.transfomerValues ().
Сначала зарегистрируйте KeyValueStore, чтобы сохранить ваше последнее значение:
//you don't need to add number_store, if your KTable already materialized to number_store
streamsBuilder
.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("number_store"), Serdes.Long(), Serdes.Long()));
numberKStream
.transformValues(ExtractIfValueChangedTransformer::new, "number_store")
.filter((key, value) -> value != null)
.foreach((key, value) -> sendUpdate(key));
Затем мы создаем ExtractIfValueChangedTransformer
, возвращаемое значение нового сообщения, только если значение изменилось, если нет, то возвращаем нуль:
public class ExtractIfValueChangedTransformer implements ValueTransformerWithKey<Long, Long, Long> {
KeyValueStore<Long, Long> kvStore;
@Override
public void init(ProcessorContext context) {
kvStore = (KeyValueStore<Long, Long>) context.getStateStore("number_store");
}
@Override
public Long transform(Long key, Long newValue) {
Long oldValue = kvStore.get(key);
kvStore.put(key, newValue);
if (oldValue == null) return newValue;
return oldValue.equals(newValue) ? null : newValue;
}
@Override
public void close() {}
}