Flink CEP - поднять предупреждение, если последовательность не прибыла в течение времени, независимо от того, поступило ли другое сообщение - PullRequest
0 голосов
/ 17 января 2020

Я написал фрагмент Flink CEP, который проверяет шаблон состояния (обозначенный id) с помощью Relaxed Contiguity (followedBy). Идея состоит в том, чтобы подать предупреждение, если определенный статус не поступил после первого в течение определенного времени.

Это работает, однако, если в этом потоке есть обычные сообщения, оповещение никогда не срабатывает. Но только когда приходит сообщение с каким-то случайным статусом, эта часть запускается.

Итак, как мне заставить его вызвать оповещение, даже если в этот поток не поступило ни одного сообщения, когда сообщение со следующей последовательностью не поступило со временем?

Pattern<Transaction, Transaction> pattern = Pattern.<Transaction>begin("start")
            .where(new SimpleCondition<Transaction>() {

                private static final long serialVersionUID = 1L;

                @Override
                public boolean filter(Transaction value) throws Exception {
                    return value.getStatus().equals(Status.STATUS_1);
                }
            })
           .followedBy("end")
           .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction value) throws Exception {
                    return value.getStatus().equals(Status.STATUS_2);
                    return amlveri;
                }
            }).within(Time.seconds(15));

PatternStream<Transaction> patternStream = CEP.pattern(dataStreamSource, pattern);
OutputTag<Alert> timedOutPartialMatchesTag = new OutputTag<Alert>("alert",
            TypeInformation.of(Alert.class)) {};
    SingleOutputStreamOperator<Alert> select = patternStream.flatSelect(timedOutPartialMatchesTag,
            new PatternFlatTimeoutFunction<Transaction, Alert>() {

                @Override
                public void timeout(Map<String, List<Transaction>> values, long arg1, Collector<Alert> arg2)
                        throws Exception {
                    Transaction failedTrans = values.get("start").get(0);
                    arg2.collect(new Alert("status_2 didnt arrive in time, ", failedTrans));
                }
            }, new PatternFlatSelectFunction<Transaction, Alert>() {

                @Override
                public void flatSelect(Map<String, List<Transaction>> arg0, Collector<Alert> arg1)
                        throws Exception {
                       // do not do anything
                }
            });
    select.getSideOutput(timedOutPartialMatchesTag).print();

1 Ответ

1 голос
/ 17 января 2020

Если вы работаете со временем события, то метод within() ожидает водяного знака для запуска таймера времени события. Но при отсутствии прибывающих событий водяной знак не будет продвигаться (при условии, что вы используете что-то вроде BoundedOutOfOrdernessTimestampExtractor для создания водяных знаков).

Если вам нужно определить ход времени, когда нет поступающих событий, тогда это необходимо использовать время обработки. Вы можете установить TimeCharacteristic на время обработки или реализовать генератор водяных знаков, который использует таймеры обработки для искусственного продвижения водяного знака, несмотря на отсутствие событий.

...