Flink ValueState будет удален из хранилища по истечении срока его действия при использовании Rocksdb? - PullRequest
0 голосов
/ 16 июня 2020
  • Я использую Flink версии 1.10.1 с backend Rockdb.
  • Я знаю, что rocksdb использует память из «управляемой памяти», и я не устанавливал никаких специальных значений c для управляемой памяти. Это делает Flink.
  • Когда я слежу за своим приложением, свободная память диспетчеров задач всегда уменьшается (я имею в виду свободную память операционной системы, измеренную с помощью free -h). Я подозреваю, что причиной может быть Rocksdb.
  • Question_1 => если значение ValueState истекло, то rockdb удалит из своей памяти и удалит из каталога localstorage? (У меня также ограниченная емкость памяти)
  • Question_2 => stream.keyBy(ipAddress), если этот ipAddress будет удерживаться Rockdb (я говорю о keyBy, а не о состоянии), всегда ли он помещается в управляемую память ? Если нет, тогда память кучи флинков будет увеличена?

Вот общая структура моего приложения:

streamA = source.filter(..);
streamA2 = source2.filter(..);
streamB = streamA.keyBy(ipAddr).window().process(); // contains value state
streamC = streamA.keyBy(ipAddr).flatMap(..); // contains value state
streamD = streamA2.keyBy(ipAddr).window.process(); // contains value state
streamE = streamA.union(streamA2).keyBy(ipAddr)....

Вот пример состояния из моего приложения:

 private transient ValueState<SampleObject> sampleState;
 StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(10))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        ValueStateDescriptor<SampleObject> sampleValueStateDescriptor = new ValueStateDescriptor<>(
                "sampleState",
                TypeInformation.of(SampleObject.class)
        );
        sampleValueStateDescriptor.enableTimeToLive(ttlConfig);

Конфигурация Rocksdb:

state.backend: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 6
state.backend.rocksdb.localdir: /pathTo/checkpoint_only_local

Почему я использую Rocksdb

  • Я использую Rockdb, потому что у меня огромный размер ключа (подумайте об этом IP-адрес) это не будет обрабатываться серверной частью HeapState или другим.
  • Мое приложение использует Rockdb, потому что у меня есть набор состояний в пользовательской ключевой функции для принятия решения в будущем. (каждое состояние имеет `StateTtlConfig)

Примечание

  • Мое приложение не требует инкрементной контрольной точки или чего-либо еще о точке сохранения. Меня не волнует сохранение всего снимка моего приложения.

1 Ответ

1 голос
/ 16 июня 2020

Flink ValueState будет удален из хранилища по истечении срока его действия при использовании Rocksdb?

Да, но не сразу. (А в некоторых более ранних версиях Flink ответ был «это зависит».)

В вашей конфигурации ttl состояния вы не указали, как вы хотите, чтобы очистка состояния выполнялась. В этом случае просроченные значения явно удаляются при чтении (например, ValueState#value), а в противном случае они периодически собираются мусором в фоновом режиме. В случае RocksDB эта очистка фона выполняется во время сжатия. Другими словами, очистка происходит не сразу. docs предоставляют более подробную информацию о том, как вы можете это настроить - вы можете настроить очистку так, чтобы она выполнялась быстрее, за счет некоторого снижения производительности.

KeyBy сам по себе не использует любое состояние. Функция селектора ключей используется для разделения потока, но ключи не сохраняются в связи с keyBy. Только операции windows и плоские карты сохраняют состояние, которое является состоянием для каждого ключа, и все это состояние с ключом будет в RocksDB (если вы не настроили свои таймеры в куче, что является опцией, но Flink 1.10 таймеров по умолчанию хранятся вне кучи, в Rockdb).

Вы можете изменить flatmap на KeyedProcessFunction и использовать таймеры для явной очистки состояния для ключей состояния, что даст вам прямой доступ контроль за тем, когда именно состояние очищается, вместо того, чтобы полагаться на механизм TTL состояния, чтобы в конечном итоге очистить состояние.

Но более вероятно, что windows создают значительное состояние. Если вы можете переключиться на предварительную агрегацию (через reduce или aggregate), это может очень помочь.

...