Похоже, ошибка в Flink-training-упражнения для примера CEP - PullRequest
0 голосов
/ 31 мая 2018

Я получил пример для CEP по следующему URL-адресу https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/cep/LongRides.java

И «цель этого упражнения - создать события START для поездок на такси, которые не были сопоставлены событием END в течение первого2 часа езды. "Однако из приведенного ниже кода, кажется, получить шаблон, чтобы найти, что поездки были завершены за 2 часа вместо НЕ было завершено за 2 часа.

Похоже, что шаблон сначала находитНачать событие, затем найти конечное событие (! Ride.isStart), и в течение 2 часов, так почему бы не объяснить, как шаблон для определения поездок был завершен за 2 часа?

Pattern<TaxiRide, TaxiRide> completedRides =
            Pattern.<TaxiRide>begin("start")
                    .where(new SimpleCondition<TaxiRide>() {
                        @Override
                        public boolean filter(TaxiRide ride) throws Exception {
                            return ride.isStart;
                        }
                    })
                    .next("end")
                    .where(new SimpleCondition<TaxiRide>() {
                        @Override
                        public boolean filter(TaxiRide ride) throws Exception {
                            return !ride.isStart;
                        }
                    });

    // We want to find rides that have NOT been completed within 120 minutes
    PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.minutes(120)));

1 Ответ

0 голосов
/ 01 июня 2018

Я улучшил комментарий в примере решения, чтобы сделать его более понятным.

// We want to find rides that have NOT been completed within 120 minutes.
// This pattern matches rides that ARE completed.
// Below we will ignore rides that match this pattern, and emit those that timeout.
PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.minutes(120)));

OutputTag<TaxiRide> timedout = new OutputTag<TaxiRide>("timedout"){};

SingleOutputStreamOperator<TaxiRide> longRides = patternStream.flatSelect(
        timedout,
        new TaxiRideTimedOut<TaxiRide>(),
        new FlatSelectNothing<TaxiRide>()
);
...