Вы можете использовать хранилище состояний (если вы используете потоки Kafka), а затем добавить к нему процессор, который обновляет хранилище состояний каждый раз, когда в тему добавляется новое значение.
builder.addGlobalStore(storeBuilder, topic, Consumed.with(keySerde, valueSerde), return new Processor<K,V>() {
private KeyValueStore<K,V> store;
public void init(ProcessorContext context) {
store=(KeyValueStore<K,V>) context.getStateStore("statestorename");
}
public void process(K key, V value) {
store.put(key,value);
}
public void close() {}
});
и тогда вы можете использовать
readOnlyStore=streams.store("statestorename", QueryableStoreTypes.keyValueStore());
readOnlyStore.get("key");