Мне нужно сравнить предыдущую сессию со средними значениями из разных сессий для одного и того же пользователя. Я использую MapState для сохранения предыдущего сеанса, но каким-то образом maptate никогда не содержит никаких предыдущих ключей, поэтому каждый сеанс является новым. вот мой код:
SessionIdentificationProcessFunction (это функция, которая собирает все события, принадлежащие одному сеансу.
static SingleOutputStreamOperator<SessionEvent> sessionUser(KeyedStream<Event, String> stream) {
return stream.window(EventTimeSessionWindows.withGap(Time.minutes(PropertyFileReader.getGAP_SECTION())))
.allowedLateness(Time.minutes(PropertyFileReader.getLATENCY_ALLOWED()))
.process(new SessionIdentificationProcessFunction<Event, SessionEvent, String, TimeWindow>() {
@Override
public void open(Configuration parameters) {
/*state configured to live just one day to avoid garbage accumulation*/
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.days(1))
.cleanupFullSnapshot()
.build();
MapStateDescriptor<String, SessionEvent> map_descriptor = new MapStateDescriptor<>("prevMapUserSession", String.class, SessionEvent.class);
map_descriptor.enableTimeToLive(ttlConfig);
previous_user_sessions_state = getRuntimeContext().getMapState(map_descriptor);
}
@Override
public SessionEvent generateSessionRecord(String s, Context context, Iterable<Event> elements) {
Comparator<Event> sortFunc = (o1, o2) -> ((o1.timestamp.before(o2.timestamp)) ? 0 : 1);
Event start = StreamSupport.stream(elements.spliterator(), false).max(sortFunc).orElse(new Event());
Event end = StreamSupport.stream(elements.spliterator(), false).max(sortFunc).orElse(new Event());
SessionEvent session_user = (end.timestamp.equals(Timestamp.from(Instant.EPOCH))) ? new SessionEvent(start) : new SessionEvent(end);
session_user.sessionEvents = StreamSupport.stream(elements.spliterator(), false).count();
session_user.sessionDuration = sd;
try {
if (previous_user_sessions_state.contains(s)) {
SessionEvent previous = previous_user_sessions_state.get(s);
/*Update values of the session with the values of the previous which never exist and delete the previous session in the map to create a new entry with the new values updated*/
previous_user_sessions_state.remove(s);
} else {
/*always get here and create a new session*/
}
previous_user_sessions_state.put(s, session_user);
} catch (Exception e) {
e.printStackTrace();
}
return session_user;
}
})
.name("User Sessions");
}