У меня есть требование сохранять последние 25 se c значений в состоянии карты мерцания для каждого ключа, но TTL удаляет все значения сразу после достижения 25-se c. пожалуйста, смотрите мой код, в Списке кодов хранятся входящие данные для каждого идентификатора датчика каждую секунду, чтобы уменьшить объем памяти, я должен сохранять только 25-se c данных в Списке. Есть ли способ достичь этого? TTL очищает весь список.
public class ContinousDataProcessor
extends KeyedProcessFunction<String,SensorData,Tuple2<String,Integer>> {
private transient MapState<String, List<SensorData>> SensorValueMapState;
private static final long serialVersionUID = 1L;
@Override
public void open(Configuration config) {
MapStateDescriptor<String, List<SensorData>> varibaleTagValueMapDescriptor = new MapStateDescriptor(
"variableTagValueMapState", String.class, SensorData.class);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(25))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
varibaleTagValueMapDescriptor.enableTimeToLive(ttlConfig);
SensorValueMapState= getRuntimeContext().getMapState(varibaleTagValueMapDescriptor);
}
@Override
public void processElement(SensorData inputData, Context arg1, Collector arg2) throws Exception {
if (SensorValueMapState.contains(inputData.sensorId)) {
SensorValueMapState.get(inputData.sensorId).add(inputData);
} else {
List<SensorData> sensorDataList = new ArrayList<>();
sensorDataList.add(inputData);
SensorValueMapState.put(inputData.sensorId, sensorDataList);
}
for (SensorData str : SensorValueMapState.get(inputData.sensorId)) {
System.out.println(str.eventTime);
}}