Я написал фрагмент Flink CEP, который проверяет шаблон состояния (обозначенный id
) с помощью Relaxed Contiguity
(followedBy
). Идея состоит в том, чтобы подать предупреждение, если определенный статус не поступил после первого в течение определенного времени.
Это работает, однако, если в этом потоке есть обычные сообщения, оповещение никогда не срабатывает. Но только когда приходит сообщение с каким-то случайным статусом, эта часть запускается.
Итак, как мне заставить его вызвать оповещение, даже если в этот поток не поступило ни одного сообщения, когда сообщение со следующей последовательностью не поступило со временем?
Pattern<Transaction, Transaction> pattern = Pattern.<Transaction>begin("start")
.where(new SimpleCondition<Transaction>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Transaction value) throws Exception {
return value.getStatus().equals(Status.STATUS_1);
}
})
.followedBy("end")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction value) throws Exception {
return value.getStatus().equals(Status.STATUS_2);
return amlveri;
}
}).within(Time.seconds(15));
PatternStream<Transaction> patternStream = CEP.pattern(dataStreamSource, pattern);
OutputTag<Alert> timedOutPartialMatchesTag = new OutputTag<Alert>("alert",
TypeInformation.of(Alert.class)) {};
SingleOutputStreamOperator<Alert> select = patternStream.flatSelect(timedOutPartialMatchesTag,
new PatternFlatTimeoutFunction<Transaction, Alert>() {
@Override
public void timeout(Map<String, List<Transaction>> values, long arg1, Collector<Alert> arg2)
throws Exception {
Transaction failedTrans = values.get("start").get(0);
arg2.collect(new Alert("status_2 didnt arrive in time, ", failedTrans));
}
}, new PatternFlatSelectFunction<Transaction, Alert>() {
@Override
public void flatSelect(Map<String, List<Transaction>> arg0, Collector<Alert> arg1)
throws Exception {
// do not do anything
}
});
select.getSideOutput(timedOutPartialMatchesTag).print();