Почему CEP не печатает первое событие только после ввода второго события при использовании ProcessingTime? - PullRequest
0 голосов
/ 12 июня 2018

Я отправил одно событие с isStart true в kafka и заставил Flink потреблять событие из kafka, также установил TimeCharacteristic в ProcessingTime и установил в (Time.seconds (5)), поэтому я ожидал, что CEP напечатает событиечерез 5 секунд я отправил первое событие, но это не так, и оно напечатало первое событие только после того, как я отправил второе событие в kafka.Почему он напечатал первое событие только после двух событий?Разве это не должно быть напечатано событие только через 5 секунд, я отправил первое при использовании ProcessingTime?

Следующий код:

public class LongRidesWithKafka {
private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
private static final String RIDE_SPEED_GROUP = "rideSpeedGroup";
private static final int MAX_EVENT_DELAY = 60; // rides are at most 60 sec out-of-order.

public static void main(String[] args) throws Exception {
    final int popThreshold = 1; // threshold for popular places
    // set up streaming execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    Properties kafkaProps = new Properties();
    //kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
    kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
    kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
    // always read the Kafka topic from the start
    kafkaProps.setProperty("auto.offset.reset", "earliest");

    // create a Kafka consumer
    FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
            "flinktest",
            new TaxiRideSchema(),
            kafkaProps);
    // assign a timestamp extractor to the consumer
    //consumer.assignTimestampsAndWatermarks(new CustomWatermarkExtractor());
    DataStream<TaxiRide> rides = env.addSource(consumer);

    DataStream<TaxiRide> keyedRides = rides.keyBy("rideId");
    // A complete taxi ride has a START event followed by an END event
    Pattern<TaxiRide, TaxiRide> completedRides =
            Pattern.<TaxiRide>begin("start")
                    .where(new SimpleCondition<TaxiRide>() {
                        @Override
                        public boolean filter(TaxiRide ride) throws Exception {
                            return ride.isStart;
                        }
                    })
                    .next("end")
                    .where(new SimpleCondition<TaxiRide>() {
                        @Override
                        public boolean filter(TaxiRide ride) throws Exception {
                            return !ride.isStart;
                        }
                    });

    // We want to find rides that have NOT been completed within 120 minutes
    PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.seconds(5)));

    OutputTag<TaxiRide> timedout = new OutputTag<TaxiRide>("timedout") {
    };
    SingleOutputStreamOperator<TaxiRide> longRides = patternStream.flatSelect(
            timedout,
            new LongRides.TaxiRideTimedOut<TaxiRide>(),
            new LongRides.FlatSelectNothing<TaxiRide>()
    );
    longRides.getSideOutput(timedout).print();
    env.execute("Long Taxi Rides");
}

public static class TaxiRideTimedOut<TaxiRide> implements PatternFlatTimeoutFunction<TaxiRide, TaxiRide> {
    @Override
    public void timeout(Map<String, List<TaxiRide>> map, long l, Collector<TaxiRide> collector) throws Exception {
        TaxiRide rideStarted = map.get("start").get(0);
        collector.collect(rideStarted);
    }
}

public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
    @Override
    public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {
    }
}

private static class TaxiRideTSExtractor extends AscendingTimestampExtractor<TaxiRide> {
    private static final long serialVersionUID = 1L;

    @Override
    public long extractAscendingTimestamp(TaxiRide ride) {

        //  Watermark Watermark = getCurrentWatermark();

        if (ride.isStart) {
            return ride.startTime.getMillis();
        } else {
            return ride.endTime.getMillis();
        }
    }
}


private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<TaxiRide> {

    private static final long serialVersionUID = -742759155861320823L;

    private long currentTimestamp = Long.MIN_VALUE;

    @Override
    public long extractTimestamp(TaxiRide ride, long previousElementTimestamp) {
        // the inputs are assumed to be of format (message,timestamp)

        if (ride.isStart) {
            this.currentTimestamp = ride.startTime.getMillis();
            return ride.startTime.getMillis();
        } else {
            this.currentTimestamp = ride.endTime.getMillis();
            return ride.endTime.getMillis();
        }
    }

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
    }
}

}

1 Ответ

0 голосов
/ 13 июня 2018

Причина в том, что библиотека CEP Флинка в настоящее время проверяет временные метки только в случае поступления и обработки другого элемента.Основное предположение состоит в том, что у вас постоянный поток событий.

Я думаю, что это ограничение библиотеки CEP Флинка.Для правильной работы Flink должен зарегистрировать таймеры времени обработки с arrivalTime + timeout, которые запускают тайм-аут шаблонов, если события не поступают.

...