Например, за A должно следовать B в течение 10 секунд. Я знаю, как отследить, произошел ли этот DID (.next, .within), но я хочу отправить предупреждение, если B никогда не происходило в окне.
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
// env.enableCheckpointing(1000);
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.build();
final DataStream<String> inputStream = env
.addSource(new RMQSource<String>(
connectionConfig, // config for the RabbitMQ connection
"cep", // name of the RabbitMQ queue to consume
true, // use correlation ids; can be false if only at-least-once is required
new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
.setParallelism(1); // non-parallel source is only required for exactly-once
inputStream.print();
Pattern<String, ?> simplePattern =
Pattern.<String>begin("start")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String event) {
return event.equals("A");
}
})
.next("end")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String event) {
return event.equals("B");
}
});
PatternStream<String> timedOutPatternStream = CEP.pattern(inputStream, simplePattern.within(Time.seconds(10)));
OutputTag<String> timedout = new OutputTag<String>("timedout"){};
SingleOutputStreamOperator<String> timedOutNotificationsStream = timedOutPatternStream.flatSelect(
timedout,
new TimedOut<String>(),
new FlatSelectNothing<String>()
);
timedOutNotificationsStream.getSideOutput(timedout).print();
env.execute("mynotification");
}
public static class TimedOut<String> implements PatternFlatTimeoutFunction<String, String> {
@Override
public void timeout(Map<java.lang.String, List<String>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
out.collect((String) "LATE!");
}
}
public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
@Override
public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {}
}
Фактическое поведение:
publish "A"
(wait 5 seconds)
publish "B"
=> (no alert)
publish "A"
(wait 10 seconds)
=> (no alert, but should be)
publish "A"
(wait 10 seconds)
publish "B"
=> "LATE!"
Ожидаемое поведение:
publish "A"
(wait 10 seconds)
=> "LATE!"