Как назначить идентификатор окна сеанса в Apache Flink? - PullRequest
0 голосов
/ 13 июня 2019

Как назначить идентификатор окну сеанса в Apache Flink?

В конечном итоге я хочу обогатить события идентификатором окна сеанса по одному, пока открыты окна сеанса (я неЯ хочу подождать, пока окно не закроется, прежде чем отправлять обогащенные события).

Я пытался сделать это с помощью функции AggregateFunction, однако я не думаю, что merge () работает так, как ожидалось.Похоже, для объединения окон, а не панелей (триггеров).Кажется, это никогда не вызывается в моем конвейере.Поэтому кажется, что между триггерами нет общего состояния!

Идентификатор окна сеанса будет меткой времени первого события, попадающего в окно (из-за негарантированного упорядочения, которое может означать, что некоторые события могут потенциальнопопасть в то же окно сеанса с более ранней меткой времени - я в порядке с этим).

public class FooSessionState {

  private Long sessionCreationTime;

  private FooMatch lastMatch;
}

/**
 * Aggregator that assigns session ids to elements of a session window
 */
public class SessionIdAssigner implements
    AggregateFunction<FooMatch, FooSessionState, FooSessionEvent> {

  static final long serialVersionUID = 0L;

  @Override
  public FooSessionState createAccumulator() {
    return new FooSessionState();
  }

  @Override
  public FooSessionState add(FooMatch value, FooSessionState sessionState) {
    if (sessionState.getSessionCreationTime() == null) {
      sessionState.setSessionCreationTime(value.getReport().getTimestamp());
    }
    sessionState.setLastMatch(value);
    return sessionState;
  }

  @Override
  public FooSessionEvent getResult(FooSessionState accumulator) {
    FooSessionEvent sessionEvent = new FooSessionEvent();
    sessionEvent.setFooMatch(accumulator.getLastMatch());
    sessionEvent.setSessionCreationTime(accumulator.getSessionCreationTime());
    return sessionEvent;

  }

  @Override
  public FooSessionState merge(FooSessionState a, FooSessionState b) {

    if ( a.getSessionCreationTime() != null) {
      b.setSessionCreationTime(a.getSessionCreationTime());
    }

    return b;
  }
}

Я планировал использовать его следующим образом:

stream.keyBy(new FooMatchKeySelector())
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(config.getFooSessionWindowTimeout())))
    .trigger(PurgingTrigger.of(CountTrigger.of(1L)))
    .aggregate(new SessionIdAssigner())

1 Ответ

0 голосов
/ 14 июня 2019

Я думаю, что окна сеансов не подходят для того, чего вы хотите достичь.Они предназначены для агрегации событий за сеанс, но не для обогащения каждого события, т. Е. Они вычисляют результат и выдают его при закрытии окна.Как вы заметили, окна сеансов работают, создавая новое окно для каждого события и объединяя перекрывающиеся окна.Этот дизайн был выбран, потому что события могут прийти не по порядку.Следовательно, может случиться так, что у вас есть два окна, которые позднее будут связаны мостовым событием.

Я бы порекомендовал реализовать логику с ProcessFunction, который собирает события и сортирует их по метке времени.При получении водяного знака он генерирует все собранные события с правильными идентификаторами сеанса.Следовательно, вы держите только события между двумя водяными знаками в состоянии.В дополнение к этим событиям вам необходимо сохранить временную метку последнего отправленного события и идентификатор последнего отправленного сеанса, чтобы выполнить правильную сессию.

...