Flink - испустить последнее окно, когда нет следующих событий - PullRequest
0 голосов
/ 29 мая 2018

У меня есть поток, содержащий события Event(Id, Type, Date), и я хочу обработать эти события, сгруппированные по (Id, Type) и сеансу активности


Например, из событий

Event1: Event(1, T1, 2018-01-24)
Event2: Event(2, T1, 2018-01-26)
Event3: Event(1, T2, 2018-01-28)
Event4: Event(1, T2, 2018-01-28)
...

Я ожидаю, что у меня будут следующие окна:

Window1 with Event1
Window2 with Event2
Window3 with Event3 and Event4
...

Насколько я понимаю, я смогу сделать это с окнами сеанса времени-события в потоке с ключами.Но с моим кодом печатается только первое окно (Window1), содержащее первое событие (Event1).

environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

environment
    .addSource(kafkaConsumer.setStartFromEarliest())
    .assignTimestampsAndWatermarks(<timestamp assigner>)
    .keyBy(e => (e.getId, e.getType))
    .window(EventTimeSessionWindows.withGap(Time.days(1)))
    .apply(new WindowFunction[Event, String, (String, String), TimeWindow]() {
          override def apply(key: (String, String), window: TimeWindow, input: Iterable[Event], out: Collector[String]): Unit = {
            var count = 0L
            for (in <- input) {
              count = count + 1
            }
            out.collect(s"Window $window count: $count")
          }
        })
  .print()

Это подходящий способ работы с историческими событиями и окнами сеанса?

1 Ответ

0 голосов
/ 30 мая 2018

Проблема в вашем случае заключается в том, что водяной знак всегда генерируется на основе входящих событий.Если нет входящих событий, водяной знак не прогрессирует.В вашем примере испускается только Window1, потому что только для Event1 есть другое следующее событие с отметкой времени, которое продвигает Watermark за промежуток между сеансами.Для остальных трех событий нет таких элементов.Для event3 и event4 таких событий нет вообще.Также из-за того, что поток имеет ключи, элементы с разными ключами обрабатываются независимо.Водяной знак не продвигается в этом случае, и поэтому окна не испускаются.

...