Apache Flink CEP, как перейти во временное окно на основе значения события? - PullRequest
0 голосов
/ 15 мая 2018
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .next("middle").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("error");
        }
    }).followedBy("end").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("critical");
        }
    }).within(Time.seconds(10));

Есть ли способ заменить Time.seconds(10) на value.getSomeTimeField(), который я передаю через Event?

1 Ответ

0 голосов
/ 16 мая 2018

Полагаю, вы хотите работать в режиме времени. Подробнее об этом вы можете прочитать в этом документе и этом разделе о том, как извлечь метку времени из элемента.

В вашем примере вы можете сделать что-то вроде:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> input = ...

input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {

    @Override
    public long extractAscendingTimestamp(MyEvent element) {
        return value.getSomeTimeField();
    }
})

CEP.pattern(input, pattern).select(...)

Таким образом, события будут автоматически отсортированы в потоке, и в обоих случаях тайм-аут будет применяться в отношении поля времени.

...