Фильтр уплотнения Flinkrocksdb не работает - PullRequest
1 голос
/ 11 марта 2020

У меня есть кластер Flink. Я включил фильтр уплотнения и использовал состояние TTL. но Rocksdb Compaction Filter не освобождает состояния из памяти.

У меня около 300 записей в секунду в моем конвейере Flink

Конфигурация TTL моего состояния:

@Override
public void open(Configuration parameters) throws Exception {
    ListStateDescriptor<ObjectNode> descriptor = new ListStateDescriptor<ObjectNode>(
            "my-state",
            TypeInformation.of(new TypeHint<ObjectNode>() {})
    );


    StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.seconds(600))
            .cleanupInRocksdbCompactFilter(2)
            .build();

    descriptor.enableTimeToLive(ttlConfig);

    myState = getRuntimeContext().getListState(descriptor);
}

flink- conf.yaml:

state.backend: rocksdb
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.block.blocksize: 16kb
state.backend.rocksdb.compaction.level.use-dynamic-size: true
state.backend.rocksdb.thread.num: 4
state.checkpoints.dir: file:///opt/flink/checkpoint
state.backend.rocksdb.timer-service.factory: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 2
state.backend.local-recovery: true
state.backend.rocksdb.localdir: /opt/flink/rocksdb
jobmanager.execution.failover-strategy: region
rest.port: 8081
state.backend.rocksdb.memory.managed: true
# state.backend.rocksdb.memory.fixed-per-slot: 20mb
state.backend.rocksdb.memory.write-buffer-ratio: 0.9
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
taskmanager.memory.managed.fraction: 0.6
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 500mb
taskmanager.memory.network.max: 700mb
taskmanager.memory.process.size: 5500mb
taskmanager.memory.task.off-heap.size: 800mb

metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: ####
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: ####
metrics.reporter.influxdb.username: ####
metrics.reporter.influxdb.password: ####
metrics.reporter.influxdb.consistency: ANY
metrics.reporter.influxdb.connectTimeout: 60000
metrics.reporter.influxdb.writeTimeout: 60000

state.backend.rocksdb.metrics.estimate-num-keys: true
state.backend.rocksdb.metrics.num-running-compactions: true
state.backend.rocksdb.metrics.background-errors: true
state.backend.rocksdb.metrics.block-cache-capacity: true
state.backend.rocksdb.metrics.block-cache-pinned-usage: true
state.backend.rocksdb.metrics.block-cache-usage: true
state.backend.rocksdb.metrics.compaction-pending: true

Мониторинг Influxdb и Grafana:

enter image description here

1 Ответ

2 голосов
/ 11 марта 2020

Как следует из названия этой очистки TTL ( cleanupInRocksdbCompactFilter ), он опирается на пользовательский фильтр уплотнения RocksDB, который выполняется только во время уплотнений. Подробнее в docs .

Метрики на скриншоте показывают, что не было запущенных уплотнений все время. Я полагаю, что размер данных просто недостаточно велик, чтобы начать любое сжатие в данный момент времени.

Фильтр сжатия не освобождает состояния из памяти .

Я предполагаю, что основная память RAM имеет в виду выражение «из памяти». Если так, то сжатие там вообще не выполняется. Размер данных, хранящихся в основной памяти в RocksDB, всегда ограничен. По сути, это кэш, и нетронутое состояние с истекшим сроком действия должно быть в конечном итоге исключено из него. Остальная часть периодически выливается на диск, и со временем уплотняется . Это когда эта очистка TTL должна удалить состояние с истекшим сроком из системы.

...