Очистка постоянного магазина Kafka Streams - PullRequest
0 голосов
/ 13 декабря 2018

Требуется ли какая-то явная очистка, чтобы предотвратить постоянное увеличение размера каждого постоянного магазина?В настоящее время я использую его для расчета агрегации в DSL API.

1 Ответ

0 голосов
/ 14 декабря 2018

У нас была похожая проблема, мы просто запланировали работу по очистке магазина в нашем процессоре / трансформаторе.Просто реализуйте свой isDataOld (nextValue), и все готово.

@Override
public void init(ProcessorContext context) {
this.kvStore = (KeyValueStore<Key, Value>) this.context.getStateStore("KV_STORE_NAME");
this.context.schedule(60000, PunctuationType.STREAM_TIME, (timestamp) -> {
    KeyValueIterator<Key, Value> iterator = kvStore.all();
    while (iterator.hasNext()){
    KeyValue<Key,Value> nextValue = iterator.next();
    if isDataOld(nextValue)
       kvStore.delete(nextValue.key);
    }

});
}
...