Я соединил два потока и затем вызвал process
, чтобы реализовать мою логику для получения результатов. Ниже приведен поток кода моего Флинка.
SingleOutputStreamOperator<LifetimeIndex> autoEncodedRulStream = dividedStream.getSideOutput(autoEncodedRULModelTag)
.keyBy(a -> a.getModelName() + "-" + a.getParameterId(), TypeInformation.of(new TypeHint<String>() {
}))
.process(AssetParameterBundler.create())
.connect(eventStream)
.keyBy(a -> a.getAssetId() + "-" + a.getModelName(),b -> b.toString())
.process(new Simulator())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LifetimeIndex>(Time.milliseconds(0L)) {
@Override
public long extractTimestamp(LifetimeIndex element) {
return element.getTime().getTime();
}
});
В классе simulator()
я расширил Coprocessfunction
, чтобы обработать два разных потока, которые являются eventStream и mainStream. Однако переменные состояния в классе simulator()
всегда возвращают NULL
, даже если я обновляю состояние в processElement2
.
Ниже приведена логика класса simulator()
. Как видите, особой логики нет, но новая логика срабатывает, когда новые данные поступают в поток событий (но он всегда равен нулю, поэтому оператор if не вызывал).
public class AutoEncoderSimulator extends CoProcessFunction<BundledAssetParameters, String, LifetimeIndex> {
// private transient MapState<String,String> mapState;
private transient ValueState<String> state;
private int numOfdataPoints = 0;
private List<double[]> trainElementList;
private List<Double> estimatedThresholdList;
private ApmAutoencoder autoencoder;
private double rulThreshold;
private double trainDataPoint;
private double lastHealthIndex;
private AutoUpdate autoUpdate;
@Override
public void processElement1(BundledAssetParameters value, Context ctx, Collector<LifetimeIndex> out) throws Exception {
LazyObject body = new LazyObject(value.getSpecifications().getModelOptions());
rulThreshold = body.getDouble("rulThreshold");
trainDataPoint = body.getDouble("trainDataPoint");
numOfdataPoints = numOfdataPoints + 1;
String event = state.value();
trainElementList.add(value.getValues());
//EVENT IS ALWAYS NULL!
if (event != null) {
}
if (numOfdataPoints == trainDataPoint) {
double[][] trainElement = ArrayUtils.toArray(new double[trainElementList.size()][]);
for(int i=0;i<numOfdataPoints;++i){
trainElement[i] = trainElementList.get(i);
}
AutoEncoderConfig autoEncoderConfig = new AutoEncoderConfig(trainElement.length,trainElement[0].length);
autoencoder = new ApmAutoencoder(autoEncoderConfig);
autoencoder.train(trainElement, 200);
//auto update
AutoUpdate autoUpdate = new AutoUpdate();
double firstThreshold = autoUpdate.firstThreshold(autoencoder.getHealthIndex(trainElement), 10, 0.995);
estimatedThresholdList.add(firstThreshold);
} else if (numOfdataPoints > trainDataPoint) {
double[] values = value.getValues();
}
}
@Override
public void processElement2(String value, Context ctx, Collector<LifetimeIndex> out) throws Exception {
state.update(value);
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>(
"1",
TypeInformation.of(new TypeHint<String>() {}));
state = getRuntimeContext().getState(descriptor);
trainElementList = new ArrayList<>();
}
public AutoEncoderSimulator create(){
return new AutoEncoderSimulator();
}
}
Iпризнателен, если вы решите мою проблему.