Я использую процессор API для удаления сообщений из хранилища состояний. Удаление работает успешно, я подтвердил с помощью интерактивных запросов вызов в хранилище состояний по ключу kafka, но это не уменьшает размер файла потоков kafka на локальном диске в каталоге tmp / kafka-streams.
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long l) {
processorContext.commit();
}
}); //invoke punctuate every 12 seconds
this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
log.info("Processor initialized");
}
@Override
public void process(String key, GenericRecord value) {
statestore.all().forEachRemaining(keyValue -> {
statestore.delete(keyValue.key);
});
}
kafka Размер каталога потоков
2.3M /private/tmp/kafka-streams
3.3M /private/tmp/kafka-streams
Нужна ли какая-либо конкретная c конфигурация, чтобы она контролировала размер файла? Если это не работает, можно ли удалить каталог kafka-streams? Я предполагаю, что это должно быть безопасно, так как такое удаление приведет к удалению записи как из хранилища состояний, так и из списка изменений topi c.