почему на локальном компьютере получаются правильные результаты, но все по-другому, когда я работал на нескольких узлах в Apche Flink? - PullRequest
0 голосов
/ 07 ноября 2019

Ниже моя логика Флинк. Я сначала обучил значения, а затем создал модель с автоматическим кодированием. Затем я проверял следующие значения, пока новое событие не поступило в поток.

DataStream<LifetimeIndex> autoEncodedRulStream = dividedStream.getSideOutput(autoEncodedRULModelTag)
                .keyBy(a -> a.getModelName() + "-"+a.getParameterId(),TypeInformation.of(new TypeHint<String>() {
                 }))
                .process(AssetParameterBundler.create())
                .connect(eventStream)
                .keyBy(a -> a.getAssetId(),b-> b.getAssetId())
                .process(new AutoEncoderSimulator())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LifetimeIndex>(Time.milliseconds(0L)) {
                    @Override
                    public long extractTimestamp(LifetimeIndex element) { return element.getTime().getTime(); }
                })
                .name("apm-model-5")
                .uid("apm-model-5-id");

Что странно для меня, так это то, что результат обновляется на локальной машине, поэтому я загрузил его на распределенные машины и отправил файл jar, но он не обновил значение, котороеотличается от локальной машины.

Ниже приведен подробный код AutoEncoderSimulator, и, как вы можете видеть, он подразделяется на три части: train, test и event с помощью оператора if. Я пытался отправить последнее значение listState, но значение в listState не обновляется на распределенной машине, даже если я добавил значения в свою логику.

Я очень признателен, если вымог видеть мой код и дать мне помощь.

public class AutoEncoderSimulator extends CoProcessFunction<BundledAssetParameters, Event, LifetimeIndex>{
    private transient MapState<String,String> mapState;
    private transient ValueState<String> state;
    private transient ListState<Double> estimatedThresholdListState;

    private int numOfdataPoints = 0;
    private List<double[]> trainElementList;
    private List<Double> estimatedThresholdList;

    private ApmAutoencoder autoencoder;
    private double rulThreshold;
    private double trainDataPoint;
    private double lastHealthIndex;
    private AutoUpdate autoUpdate;

    @Override
    public void processElement1(BundledAssetParameters value, Context ctx, Collector<LifetimeIndex> out) throws Exception{
        String event = state.value();

    //EVENT OCCURRED
        if(state.value()!=null){
            double threshold = 0.0;
            if(estimatedThresholdList.size()>2){
                double[] thresholdValues = new double[estimatedThresholdList.size()];
                for (int i = 0; i < thresholdValues.length; i++) {
                    thresholdValues[i] = estimatedThresholdList.get(i);
                }
                double lastEstimatedThreshold = estimatedThresholdList.get(estimatedThresholdList.size()-1);
                threshold = autoUpdate.updateThreshold(thresholdValues,lastEstimatedThreshold, lastHealthIndex, 0.9, 0.7);
            }
            else
                threshold = autoUpdate.secondThreshold(lastHealthIndex,estimatedThresholdList.get(0));
            estimatedThresholdList.add(threshold);
            initialize();
            state.clear();
        }

        LazyObject body = new LazyObject(value.getSpecifications().getModelOptions());
        rulThreshold = 0.0;
        trainDataPoint = body.getDouble("trainDataPoint");

        numOfdataPoints = numOfdataPoints + 1;
        trainElementList.add(value.getValues());

        if(autoencoder!=null && numOfdataPoints < trainDataPoint){
            double [][]test = getTestValues(value);
            double[] healthIndex = autoencoder.getHealthIndex(test);
            collect(value, out, healthIndex[healthIndex.length - 1]);
        }


    //TRAIN
        if (numOfdataPoints == trainDataPoint){
            double[][] trainElement = ArrayUtils.toArray(new double[trainElementList.size()][]);
            for(int i=0;i<numOfdataPoints;++i){
                trainElement[i] = trainElementList.get(i);
            }
            AutoEncoderConfig autoEncoderConfig = new AutoEncoderConfig(trainElement.length,trainElement[0].length);
            autoencoder = new ApmAutoencoder(autoEncoderConfig);
            autoencoder.train(trainElement, 200);

            //auto update for the first time
            if(estimatedThresholdList.size()==0){
                double firstThreshold = autoUpdate.firstThreshold(autoencoder.getHealthIndex(trainElement), 10, 0.995);
                estimatedThresholdList.add(firstThreshold);
            }
        }else if (numOfdataPoints > trainDataPoint) {   //TEST
            double[][] test = getTestValues(value);
            double[] healthIndex = autoencoder.getHealthIndex(test);
            lastHealthIndex = healthIndex[healthIndex.length-1];
            collect(value, out, healthIndex[healthIndex.length - 1]);
        }
    }



    private double[][] getTestValues(BundledAssetParameters value) {
        double[] values = value.getValues();
        double[][] testElement = new double[1][0];
        testElement[0] = values;
        return testElement;
    }

    @Override
    public void processElement2(Event value, Context ctx, Collector<LifetimeIndex> out) throws Exception {
        state.update("event");
    }

    private void initialize() {
        trainElementList.clear();
        numOfdataPoints=0;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<String> stateDescriptor =
                new ValueStateDescriptor<String>("value",String.class);
        state = getRuntimeContext().getState(stateDescriptor);
        trainElementList = new ArrayList<>();
        estimatedThresholdList = new ArrayList<>();
        autoUpdate = new AutoUpdate();
    }

    public AutoEncoderSimulator create(){
        return new AutoEncoderSimulator();
    }

    private void collect(BundledAssetParameters value, Collector<LifetimeIndex> out, double healthIndex) {
            //print output
     }
}
...