Хранилище состояния потока Kafka при удалении сообщений размером файла не уменьшается - PullRequest
0 голосов
/ 12 февраля 2020

Я использую процессор API для удаления сообщений из хранилища состояний. Удаление работает успешно, я подтвердил с помощью интерактивных запросов вызов в хранилище состояний по ключу kafka, но это не уменьшает размер файла потоков kafka на локальном диске в каталоге tmp / kafka-streams.

@Override
    public void init(ProcessorContext processorContext) {
        this.processorContext = processorContext;
        processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
            @Override
            public void punctuate(long l) {
                processorContext.commit();
            }
        }); //invoke punctuate every 12 seconds
        this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
        log.info("Processor initialized");
    }

    @Override
    public void process(String key, GenericRecord value) {
        statestore.all().forEachRemaining(keyValue -> {
            statestore.delete(keyValue.key);
        });
    }

kafka Размер каталога потоков

2.3M    /private/tmp/kafka-streams
3.3M    /private/tmp/kafka-streams

Нужна ли какая-либо конкретная c конфигурация, чтобы она контролировала размер файла? Если это не работает, можно ли удалить каталог kafka-streams? Я предполагаю, что это должно быть безопасно, так как такое удаление приведет к удалению записи как из хранилища состояний, так и из списка изменений topi c.

1 Ответ

1 голос
/ 13 февраля 2020

RocksDB выполняет сжатие файлов в фоновом режиме. Следовательно, если вам нужно более агрессивное сжатие, вы должны передать пользовательский RocksDBConfigSetter через параметр конфигурации Streams rocksdb.config.setter. Для получения более подробной информации о RockDB, ознакомьтесь с документацией RocksDB.

https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb -config-setter

Однако я не рекомендовал бы изменять конфигурации RocksDB, если нет никакой реальной проблемы - вы можете принести больше вреда, чем пользы. Похоже, что размер хранилища у вас довольно маленький, поэтому я не вижу реальной проблемы в банкомате.

Кстати: вы должны настроить state.dir на случай, если вы go перейдете в производство и не переведете состояние в состояние по умолчанию /tmp местоположение.

...