Ниже моя логика Флинк. Я сначала обучил значения, а затем создал модель с автоматическим кодированием. Затем я проверял следующие значения, пока новое событие не поступило в поток.
DataStream<LifetimeIndex> autoEncodedRulStream = dividedStream.getSideOutput(autoEncodedRULModelTag)
.keyBy(a -> a.getModelName() + "-"+a.getParameterId(),TypeInformation.of(new TypeHint<String>() {
}))
.process(AssetParameterBundler.create())
.connect(eventStream)
.keyBy(a -> a.getAssetId(),b-> b.getAssetId())
.process(new AutoEncoderSimulator())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LifetimeIndex>(Time.milliseconds(0L)) {
@Override
public long extractTimestamp(LifetimeIndex element) { return element.getTime().getTime(); }
})
.name("apm-model-5")
.uid("apm-model-5-id");
Что странно для меня, так это то, что результат обновляется на локальной машине, поэтому я загрузил его на распределенные машины и отправил файл jar, но он не обновил значение, котороеотличается от локальной машины.
Ниже приведен подробный код AutoEncoderSimulator
, и, как вы можете видеть, он подразделяется на три части: train
, test
и event
с помощью оператора if. Я пытался отправить последнее значение listState
, но значение в listState
не обновляется на распределенной машине, даже если я добавил значения в свою логику.
Я очень признателен, если вымог видеть мой код и дать мне помощь.
public class AutoEncoderSimulator extends CoProcessFunction<BundledAssetParameters, Event, LifetimeIndex>{
private transient MapState<String,String> mapState;
private transient ValueState<String> state;
private transient ListState<Double> estimatedThresholdListState;
private int numOfdataPoints = 0;
private List<double[]> trainElementList;
private List<Double> estimatedThresholdList;
private ApmAutoencoder autoencoder;
private double rulThreshold;
private double trainDataPoint;
private double lastHealthIndex;
private AutoUpdate autoUpdate;
@Override
public void processElement1(BundledAssetParameters value, Context ctx, Collector<LifetimeIndex> out) throws Exception{
String event = state.value();
//EVENT OCCURRED
if(state.value()!=null){
double threshold = 0.0;
if(estimatedThresholdList.size()>2){
double[] thresholdValues = new double[estimatedThresholdList.size()];
for (int i = 0; i < thresholdValues.length; i++) {
thresholdValues[i] = estimatedThresholdList.get(i);
}
double lastEstimatedThreshold = estimatedThresholdList.get(estimatedThresholdList.size()-1);
threshold = autoUpdate.updateThreshold(thresholdValues,lastEstimatedThreshold, lastHealthIndex, 0.9, 0.7);
}
else
threshold = autoUpdate.secondThreshold(lastHealthIndex,estimatedThresholdList.get(0));
estimatedThresholdList.add(threshold);
initialize();
state.clear();
}
LazyObject body = new LazyObject(value.getSpecifications().getModelOptions());
rulThreshold = 0.0;
trainDataPoint = body.getDouble("trainDataPoint");
numOfdataPoints = numOfdataPoints + 1;
trainElementList.add(value.getValues());
if(autoencoder!=null && numOfdataPoints < trainDataPoint){
double [][]test = getTestValues(value);
double[] healthIndex = autoencoder.getHealthIndex(test);
collect(value, out, healthIndex[healthIndex.length - 1]);
}
//TRAIN
if (numOfdataPoints == trainDataPoint){
double[][] trainElement = ArrayUtils.toArray(new double[trainElementList.size()][]);
for(int i=0;i<numOfdataPoints;++i){
trainElement[i] = trainElementList.get(i);
}
AutoEncoderConfig autoEncoderConfig = new AutoEncoderConfig(trainElement.length,trainElement[0].length);
autoencoder = new ApmAutoencoder(autoEncoderConfig);
autoencoder.train(trainElement, 200);
//auto update for the first time
if(estimatedThresholdList.size()==0){
double firstThreshold = autoUpdate.firstThreshold(autoencoder.getHealthIndex(trainElement), 10, 0.995);
estimatedThresholdList.add(firstThreshold);
}
}else if (numOfdataPoints > trainDataPoint) { //TEST
double[][] test = getTestValues(value);
double[] healthIndex = autoencoder.getHealthIndex(test);
lastHealthIndex = healthIndex[healthIndex.length-1];
collect(value, out, healthIndex[healthIndex.length - 1]);
}
}
private double[][] getTestValues(BundledAssetParameters value) {
double[] values = value.getValues();
double[][] testElement = new double[1][0];
testElement[0] = values;
return testElement;
}
@Override
public void processElement2(Event value, Context ctx, Collector<LifetimeIndex> out) throws Exception {
state.update("event");
}
private void initialize() {
trainElementList.clear();
numOfdataPoints=0;
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<String> stateDescriptor =
new ValueStateDescriptor<String>("value",String.class);
state = getRuntimeContext().getState(stateDescriptor);
trainElementList = new ArrayList<>();
estimatedThresholdList = new ArrayList<>();
autoUpdate = new AutoUpdate();
}
public AutoEncoderSimulator create(){
return new AutoEncoderSimulator();
}
private void collect(BundledAssetParameters value, Collector<LifetimeIndex> out, double healthIndex) {
//print output
}
}