Я записываю канал в групповой сеанс для пользователя с ключом id и окном, используя eventSessionWindow. Я использую Periodi c WM и пользовательский аккумулятор сеансов, который будет считать событие заданным сеансом.
Что происходит, мой оператор окна использует записи, но не выдает их. Я не уверен, чего здесь не хватает.
FlinkKafkaConsumer010<String> eventSource =
new FlinkKafkaConsumer010<>("events", new SimpleStringSchema(), properties);
eventSource.setStartFromLatest();
DataStream<Event> eventStream = env.addSource(eventSource
).flatMap(
new FlatMapFunction<String, Event>() {
@Override
public void flatMap(String value, Collector<Event> out) throws Exception {
out.collect(Event.toEvent(value));
}
}
).assignTimestampsAndWatermarks(
new AssignerWithPeriodicWatermarks<Event>() {
long maxTime;
@Override
public long extractTimestamp(Event element, long previousElementTimestamp) {
maxTime = Math.max(previousElementTimestamp, maxTime);
return previousElementTimestamp;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTime);
}
}
);
DataStream <Session> session_stream =eventStream.keyBy((KeySelector<Event, String>)value -> value.id)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.aggregate(new AggregateFunction<Event, pipe.SessionAccumulator, Session>() {
@Override
public pipe.SessionAccumulator createAccumulator() {
return new pipe.SessionAccumulator();
}
@Override
public pipe.SessionAccumulator add(Event e, pipe.SessionAccumulator sessionAccumulator) {
sessionAccumulator.add(e);
return sessionAccumulator;
}
@Override
public Session getResult(pipe.SessionAccumulator sessionAccumulator) {
return sessionAccumulator.getLocalValue();
}
@Override
public pipe.SessionAccumulator merge(pipe.SessionAccumulator prev, pipe.SessionAccumulator next) {
prev.merge(next);
return prev;
}
}, new WindowFunction<Session, Session, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow timeWindow, Iterable<Session> iterable, Collector<Session> collector) throws Exception {
collector.collect(iterable.iterator().next());
}
});
public static class SessionAccumulator implements Accumulator<Event, Session>{
Session session;
public SessionAccumulator(){
session = new Session();
}
@Override
public void add(Event e) {
session.add(e);
}
@Override
public Session getLocalValue() {
return session;
}
@Override
public void resetLocal() {
session = new Session();
}
@Override
public void merge(Accumulator<Event, Session> accumulator) {
session.merge(Collections.singletonList(accumulator.getLocalValue()));
}
@Override
public Accumulator<Event, Session> clone() {
SessionAccumulator sessionAccumulator = new SessionAccumulator();
sessionAccumulator.session = new Session(
session.id,
);
return sessionAccumulator;
}
}
public static class SessionAccumulator implements Accumulator<Event, Session>{
Session session;
public SessionAccumulator(){
session = new Session();
}
@Override
public void add(Event e) {
session.add(e);
}
@Override
public Session getLocalValue() {
return session;
}
@Override
public void resetLocal() {
session = new Session();
}
@Override
public void merge(Accumulator<Event, Session> accumulator) {
session.merge(Collections.singletonList(accumulator.getLocalValue()));
}
@Override
public Accumulator<Event, Session> clone() {
SessionAccumulator sessionAccumulator = new SessionAccumulator();
sessionAccumulator.session = new Session(
session.id,
session.lastEventTime,
session.earliestEventTime,
session.count;
);
return sessionAccumulator;
}
}