Flink MapState очищает все значения ключа карты во время очистки TTL - PullRequest
0 голосов
/ 18 марта 2020

У меня есть требование сохранять последние 25 se c значений в состоянии карты мерцания для каждого ключа, но TTL удаляет все значения сразу после достижения 25-se c. пожалуйста, смотрите мой код, в Списке кодов хранятся входящие данные для каждого идентификатора датчика каждую секунду, чтобы уменьшить объем памяти, я должен сохранять только 25-se c данных в Списке. Есть ли способ достичь этого? TTL очищает весь список.

public class ContinousDataProcessor
    extends   KeyedProcessFunction<String,SensorData,Tuple2<String,Integer>> {

private transient MapState<String, List<SensorData>> SensorValueMapState;
private static final long serialVersionUID = 1L;

@Override
public void open(Configuration config) {
    MapStateDescriptor<String, List<SensorData>> varibaleTagValueMapDescriptor = new MapStateDescriptor(
            "variableTagValueMapState", String.class, SensorData.class);
    StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(25))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();

    varibaleTagValueMapDescriptor.enableTimeToLive(ttlConfig);
    SensorValueMapState= getRuntimeContext().getMapState(varibaleTagValueMapDescriptor);

}

@Override
public void processElement(SensorData inputData, Context arg1, Collector arg2) throws Exception {


    if (SensorValueMapState.contains(inputData.sensorId)) {
        SensorValueMapState.get(inputData.sensorId).add(inputData);
    } else {
        List<SensorData> sensorDataList = new ArrayList<>();
        sensorDataList.add(inputData);
        SensorValueMapState.put(inputData.sensorId, sensorDataList);
    }

            for (SensorData str : SensorValueMapState.get(inputData.sensorId)) {

        System.out.println(str.eventTime);          
    }}

1 Ответ

1 голос
/ 18 марта 2020

Как я понимаю, вы хотите, чтобы TTL применялся для каждого элемента в списке. Список является значением в состоянии карты в вашем случае. Состояние карты не имеет представления о структуре значения пользователя в состоянии карты. Это ограничение расположения данных в бэкэндах состояния. Следовательно, применение TTL для каждого элемента невозможно в текущей реализации.

TTL применяется для каждого пользовательского значения в состоянии значения, для каждого пользовательского элемента в состоянии списка и для каждой пары ключ / значение в состоянии карты.

В зависимости от требований вашего приложения, вы можете попробовать состояние списка с помощью составного ключа:

key of KeyedProcessFunction = current key of your KeyedProcessFunction + your current map state key

Это не позволяет легко получить все списки по текущему ключу вашей функции KeyedProcessFunction, как вы можете сделай это сейчас.

...