Как назначить идентификатор окну сеанса в Apache Flink?
В конечном итоге я хочу обогатить события идентификатором окна сеанса по одному, пока открыты окна сеанса (я неЯ хочу подождать, пока окно не закроется, прежде чем отправлять обогащенные события).
Я пытался сделать это с помощью функции AggregateFunction, однако я не думаю, что merge () работает так, как ожидалось.Похоже, для объединения окон, а не панелей (триггеров).Кажется, это никогда не вызывается в моем конвейере.Поэтому кажется, что между триггерами нет общего состояния!
Идентификатор окна сеанса будет меткой времени первого события, попадающего в окно (из-за негарантированного упорядочения, которое может означать, что некоторые события могут потенциальнопопасть в то же окно сеанса с более ранней меткой времени - я в порядке с этим).
public class FooSessionState {
private Long sessionCreationTime;
private FooMatch lastMatch;
}
/**
* Aggregator that assigns session ids to elements of a session window
*/
public class SessionIdAssigner implements
AggregateFunction<FooMatch, FooSessionState, FooSessionEvent> {
static final long serialVersionUID = 0L;
@Override
public FooSessionState createAccumulator() {
return new FooSessionState();
}
@Override
public FooSessionState add(FooMatch value, FooSessionState sessionState) {
if (sessionState.getSessionCreationTime() == null) {
sessionState.setSessionCreationTime(value.getReport().getTimestamp());
}
sessionState.setLastMatch(value);
return sessionState;
}
@Override
public FooSessionEvent getResult(FooSessionState accumulator) {
FooSessionEvent sessionEvent = new FooSessionEvent();
sessionEvent.setFooMatch(accumulator.getLastMatch());
sessionEvent.setSessionCreationTime(accumulator.getSessionCreationTime());
return sessionEvent;
}
@Override
public FooSessionState merge(FooSessionState a, FooSessionState b) {
if ( a.getSessionCreationTime() != null) {
b.setSessionCreationTime(a.getSessionCreationTime());
}
return b;
}
}
Я планировал использовать его следующим образом:
stream.keyBy(new FooMatchKeySelector())
.window(EventTimeSessionWindows.withGap(Time.milliseconds(config.getFooSessionWindowTimeout())))
.trigger(PurgingTrigger.of(CountTrigger.of(1L)))
.aggregate(new SessionIdAssigner())