MapState не сохраняет предыдущий сеанс с EventTimeSession Windows в Flink java - PullRequest
1 голос
/ 15 апреля 2020

Мне нужно сравнить предыдущую сессию со средними значениями из разных сессий для одного и того же пользователя. Я использую MapState для сохранения предыдущего сеанса, но каким-то образом maptate никогда не содержит никаких предыдущих ключей, поэтому каждый сеанс является новым. вот мой код:

SessionIdentificationProcessFunction (это функция, которая собирает все события, принадлежащие одному сеансу.

static SingleOutputStreamOperator<SessionEvent> sessionUser(KeyedStream<Event, String> stream) {
    return stream.window(EventTimeSessionWindows.withGap(Time.minutes(PropertyFileReader.getGAP_SECTION())))
            .allowedLateness(Time.minutes(PropertyFileReader.getLATENCY_ALLOWED()))
            .process(new SessionIdentificationProcessFunction<Event, SessionEvent, String, TimeWindow>() {
                @Override
                public void open(Configuration parameters) {
                    /*state configured to live just one day to avoid garbage accumulation*/
                    StateTtlConfig ttlConfig = StateTtlConfig
                            .newBuilder(org.apache.flink.api.common.time.Time.days(1))
                            .cleanupFullSnapshot()
                            .build();
                    MapStateDescriptor<String, SessionEvent> map_descriptor = new MapStateDescriptor<>("prevMapUserSession", String.class, SessionEvent.class);
                    map_descriptor.enableTimeToLive(ttlConfig);
                    previous_user_sessions_state = getRuntimeContext().getMapState(map_descriptor);
                }

                @Override
                public SessionEvent generateSessionRecord(String s, Context context, Iterable<Event> elements) {
                    Comparator<Event> sortFunc = (o1, o2) -> ((o1.timestamp.before(o2.timestamp)) ? 0 : 1);
                    Event start = StreamSupport.stream(elements.spliterator(), false).max(sortFunc).orElse(new Event());
                    Event end = StreamSupport.stream(elements.spliterator(), false).max(sortFunc).orElse(new Event());
                    SessionEvent session_user = (end.timestamp.equals(Timestamp.from(Instant.EPOCH))) ? new SessionEvent(start) : new SessionEvent(end);
                    session_user.sessionEvents = StreamSupport.stream(elements.spliterator(), false).count();
                    session_user.sessionDuration = sd;
                    try {
                        if (previous_user_sessions_state.contains(s)) {
                            SessionEvent previous = previous_user_sessions_state.get(s);

                           /*Update values of the session with the values of the previous which never exist and delete the previous session in the map to create a new entry with the new values updated*/

                            previous_user_sessions_state.remove(s);
                        } else {
                            /*always get here and create a new session*/
                        }

                        previous_user_sessions_state.put(s, session_user);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return session_user;
                }
            })
            .name("User Sessions");
}

1 Ответ

0 голосов
/ 15 апреля 2020

Не видя, как реализован SessionIdentificationProcessFunction, я не совсем уверен, что происходит, но сессия Флинка windows довольно особенная, поэтому неудивительно, что это не работает. Частично проблема заключается в том, что у любого данного окна сеанса очень короткое время жизни до его объединения с другим окном сеанса. (По мере поступления каждого нового события оно изначально присваивается собственному окну сеанса, после чего обрабатывается набор всего текущего сеанса windows и выполняются любые возможные объединения (на основе разрыва сеанса).)

Я рекомендую вместо getRuntimeContext().getMapState() использовать вместо context.globalState().getMapState() (где context - это ProcessWindowFunction.Context, переданный методу process() для ProcessWindowFunction). globalState - это KeyedStateStore, предназначенный именно для этой цели - сохранение состояния ключа, которое является глобальным / общим для всех экземпляров окна для этого ключа.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...