почему состояние всегда возвращает нуль в функции сопроцессора в Apache Flink? - PullRequest
0 голосов
/ 05 ноября 2019

Я соединил два потока и затем вызвал process, чтобы реализовать мою логику для получения результатов. Ниже приведен поток кода моего Флинка.

SingleOutputStreamOperator<LifetimeIndex> autoEncodedRulStream = dividedStream.getSideOutput(autoEncodedRULModelTag)
                .keyBy(a -> a.getModelName() + "-" + a.getParameterId(), TypeInformation.of(new TypeHint<String>() {
                }))
                .process(AssetParameterBundler.create())
                .connect(eventStream)
                .keyBy(a -> a.getAssetId() + "-" + a.getModelName(),b -> b.toString())
                .process(new Simulator())
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LifetimeIndex>(Time.milliseconds(0L)) {
                    @Override
                    public long extractTimestamp(LifetimeIndex element) {
                        return element.getTime().getTime();
                    }
                });

В классе simulator() я расширил Coprocessfunction, чтобы обработать два разных потока, которые являются eventStream и mainStream. Однако переменные состояния в классе simulator() всегда возвращают NULL, даже если я обновляю состояние в processElement2.

Ниже приведена логика класса simulator(). Как видите, особой логики нет, но новая логика срабатывает, когда новые данные поступают в поток событий (но он всегда равен нулю, поэтому оператор if не вызывал).

public class AutoEncoderSimulator extends CoProcessFunction<BundledAssetParameters, String, LifetimeIndex> {
//    private transient MapState<String,String> mapState;
    private transient ValueState<String> state;

    private int numOfdataPoints = 0;
    private List<double[]> trainElementList;
    private List<Double> estimatedThresholdList;

    private ApmAutoencoder autoencoder;
    private double rulThreshold;
    private double trainDataPoint;
    private double lastHealthIndex;
    private AutoUpdate autoUpdate;

    @Override
    public void processElement1(BundledAssetParameters value, Context ctx, Collector<LifetimeIndex> out) throws Exception {
        LazyObject body = new LazyObject(value.getSpecifications().getModelOptions());
        rulThreshold = body.getDouble("rulThreshold");
        trainDataPoint = body.getDouble("trainDataPoint");

        numOfdataPoints = numOfdataPoints + 1;
        String event = state.value();
        trainElementList.add(value.getValues());


      //EVENT IS ALWAYS NULL!
        if (event != null) {
        }

        if (numOfdataPoints == trainDataPoint) {
            double[][] trainElement = ArrayUtils.toArray(new double[trainElementList.size()][]);
            for(int i=0;i<numOfdataPoints;++i){
                trainElement[i] = trainElementList.get(i);
            }
            AutoEncoderConfig autoEncoderConfig = new AutoEncoderConfig(trainElement.length,trainElement[0].length);
            autoencoder = new ApmAutoencoder(autoEncoderConfig);
            autoencoder.train(trainElement, 200);

            //auto update
            AutoUpdate autoUpdate = new AutoUpdate();
            double firstThreshold = autoUpdate.firstThreshold(autoencoder.getHealthIndex(trainElement), 10, 0.995);
            estimatedThresholdList.add(firstThreshold);

        } else if (numOfdataPoints > trainDataPoint) {
            double[] values = value.getValues();

        }
    }




    @Override
    public void processElement2(String value, Context ctx, Collector<LifetimeIndex> out) throws Exception {
        state.update(value);
    }




    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>(
                "1",
                TypeInformation.of(new TypeHint<String>() {}));
        state = getRuntimeContext().getState(descriptor);

        trainElementList = new ArrayList<>();
    }
    public AutoEncoderSimulator create(){
        return new AutoEncoderSimulator();
    }
}

Iпризнателен, если вы решите мою проблему.

1 Ответ

1 голос
/ 05 ноября 2019

Одна вещь, которую вы, возможно, упустили из виду, это то, что это состояние с разделением по ключам - поэтому ваша переменная state не является строкой, но на самом деле является дескриптором, ссылающимся на распределенное хранилище ключей / значений (где ключи и значения являютсяСтроки, в данном случае).

Когда вы вызываете state.update(value) в processElement2, запись в этой хэш-таблице для ключа в контексте (ключ для текущего события) обновляется. Вы уверены, что тот же ключ находится в контексте, когда state.value() вызывается позже в processElement1?

Поскольку два подключенных потока находятся в состоянии совместного использования, крайне важно, чтобы оба потока были совместимы ключами,Я вижу, что оба потока основаны на строках, но не ясно, что эти строки находятся в одном и том же пространстве ключей. Маловероятно, что modelName + parameterId будет равно assetId + modelName.

На простом учебном сайте Flink есть простой пример этого паттерна , который может оказаться вам полезным.

...