Flink Event Session Window не генерирует записи - PullRequest
0 голосов
/ 18 апреля 2020

Я записываю канал в групповой сеанс для пользователя с ключом id и окном, используя eventSessionWindow. Я использую Periodi c WM и пользовательский аккумулятор сеансов, который будет считать событие заданным сеансом.

Что происходит, мой оператор окна использует записи, но не выдает их. Я не уверен, чего здесь не хватает.


FlinkKafkaConsumer010<String> eventSource =
                new FlinkKafkaConsumer010<>("events", new SimpleStringSchema(), properties);
        eventSource.setStartFromLatest();

DataStream<Event> eventStream = env.addSource(eventSource
        ).flatMap(
                new FlatMapFunction<String, Event>() {

                    @Override
                    public void flatMap(String value, Collector<Event> out) throws Exception {
                        out.collect(Event.toEvent(value));
                    }
                }
        ).assignTimestampsAndWatermarks(
                new AssignerWithPeriodicWatermarks<Event>() {
                    long maxTime;

                    @Override
                    public long extractTimestamp(Event element, long previousElementTimestamp) {
                        maxTime = Math.max(previousElementTimestamp, maxTime);
                        return previousElementTimestamp;
                    }

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(maxTime);
                    }
                }
        );

       DataStream <Session> session_stream =eventStream.keyBy((KeySelector<Event, String>)value -> value.id)
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))

                .aggregate(new AggregateFunction<Event, pipe.SessionAccumulator, Session>() {
                    @Override
                    public pipe.SessionAccumulator createAccumulator() {
                        return new pipe.SessionAccumulator();
                    }

                    @Override
                    public pipe.SessionAccumulator add(Event e, pipe.SessionAccumulator sessionAccumulator) {
                        sessionAccumulator.add(e);
                        return sessionAccumulator;
                    }

                    @Override
                    public Session getResult(pipe.SessionAccumulator sessionAccumulator) {
                        return sessionAccumulator.getLocalValue();
                    }

                    @Override
                    public pipe.SessionAccumulator merge(pipe.SessionAccumulator prev, pipe.SessionAccumulator next) {
                        prev.merge(next);
                        return prev;
                    }

                }, new WindowFunction<Session, Session, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow timeWindow, Iterable<Session> iterable, Collector<Session> collector) throws Exception {
                        collector.collect(iterable.iterator().next());
                    }
                });


    public static class SessionAccumulator implements Accumulator<Event, Session>{
        Session session;

        public SessionAccumulator(){
            session = new Session();
        }

        @Override
        public void add(Event e) {
            session.add(e);

        }

        @Override
        public Session getLocalValue() {
            return session;
        }

        @Override
        public void resetLocal() {
            session =  new Session();

        }

        @Override
        public void merge(Accumulator<Event, Session> accumulator) {
            session.merge(Collections.singletonList(accumulator.getLocalValue()));

        }

        @Override
        public Accumulator<Event, Session> clone() {
            SessionAccumulator sessionAccumulator = new SessionAccumulator();
            sessionAccumulator.session = new Session(
                    session.id,
            );
            return sessionAccumulator;
        }
    }


    public static class SessionAccumulator implements Accumulator<Event, Session>{
        Session session;

        public SessionAccumulator(){
            session = new Session();
        }

        @Override
        public void add(Event e) {
            session.add(e);

        }

        @Override
        public Session getLocalValue() {
            return session;
        }

        @Override
        public void resetLocal() {
            session =  new Session();

        }

        @Override
        public void merge(Accumulator<Event, Session> accumulator) {
            session.merge(Collections.singletonList(accumulator.getLocalValue()));

        }

        @Override
        public Accumulator<Event, Session> clone() {
            SessionAccumulator sessionAccumulator = new SessionAccumulator();
            sessionAccumulator.session = new Session(
                    session.id,
                    session.lastEventTime,
                    session.earliestEventTime,
                    session.count;

            );
            return sessionAccumulator;
        }
    }

1 Ответ

0 голосов
/ 19 апреля 2020

Если ваши водяные знаки не продвигаются, это объясняет, почему окно не выводит никаких результатов. Возможные причины:

  • Kafka не пометила ваши события меткой времени, и поэтому значение previousElementTimestamp не установлено.
  • У вас неактивный раздел Kafka, удерживающий водяные знаки. (Это довольно сложная топи c. Если это является причиной ваших проблем, и вы застряли на ней, пожалуйста, вернитесь с новым вопросом.)

Другая возможность в том, что в событиях никогда не бывает 5-минутного промежутка, и в этом случае события будут накапливаться в бесконечной сессии.

Кроме того, вы, похоже, не включили приемник. Если вы не распечатаете или не отправите иным образом результаты в приемник, Флинк ничего не сделает.

И не забывайте, что вы должны позвонить env.execute(), чтобы что-то произошло.

Несколько других вещей:

Ваш генератор водяных знаков не допускает какой-либо неупорядоченности, поэтому окно будет игнорировать все неупорядоченные события (потому что они будут запаздывать). Если ваши события имеют строго восходящие временные метки, вы должны go впереди и использовать AscendingTimestampExtractor ; если они могут быть не в порядке, тогда BoundedOutOfOrdernessTimestampExtractor подходит.

Ваша WindowFunction излишня. Он просто перенаправляет результат от агрегатора, чтобы вы могли удалить его.

Вы опубликовали две разные реализации SessionAccumulator.

...