Окно неупорядоченных временных событий - PullRequest
0 голосов
/ 20 мая 2018

В следующем простом коде Flink у меня есть 3 события с временными событиями, по 1 секунде между каждыми двумя.Они служат для неупорядоченного Flink: events: 2, 1, 3. Я заметил, что когда я изменяю аргумент для timeWindowAll, я иногда получаю события, все 3 события печатаются, а иногда только 2 и 3.

Некоторые примеры:

.timeWindowAll(Time.seconds(3)) --> 2, 3
.timeWindowAll(Time.seconds(4)) --> 2, 1, 3
.timeWindowAll(Time.seconds(5)) --> 2, 3
.timeWindowAll(Time.seconds(6)) --> 2, 3
.timeWindowAll(Time.seconds(7)) --> 2, 1, 3
.timeWindowAll(Time.seconds(8)) --> 2, 1, 3
.timeWindowAll(Time.seconds(9)) --> 2, 1, 3
.timeWindowAll(Time.seconds(10)) --> 2, 3
...

Может кто-нибудь объяснить, почему так происходит?Я предполагаю, что это связано со временем начала окна, и это событие 1 опаздывает.Так что в этом случае, задав «размер» timeWindowAll, как узнать, какое будет время начала каждого окна?

object UnorederedTimeEvents {
  case class MyEvent(timestamp: Long, str: String)
  class MyAssignerWithPunctuatedWatermarks extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = new Watermark(extractedTimestamp)

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = element.timestamp
  }

  class MyProcessAllWindowFunction extends ProcessAllWindowFunction[MyEvent, MyEvent, TimeWindow] {
    override def process(context: Context, elements: Iterable[MyEvent], out: Collector[MyEvent]): Unit = {
      elements.foreach(out.collect)
    }
  }

  def main(args: Array[String]): Unit = {

    val events = List(MyEvent(1526056650167L, "2"), MyEvent(1526056649167L, "1"), MyEvent(1526056651167L, "3"))

    println(events.sortBy(_.timestamp))

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    env
      .fromCollection(events)
      .assignTimestampsAndWatermarks(new MyAssignerWithPunctuatedWatermarks)
      .timeWindowAll(Time.seconds(10))
      .process(new MyProcessAllWindowFunction)
      .print()

    env.execute()

  }

}

1 Ответ

0 голосов
/ 20 мая 2018

Преобразовывая ваши временные метки в человеческое время, мы видим, что они в это время (UTC):

2: Friday, May 11, 2018 4:37:30.167 PM
1: Friday, May 11, 2018 4:37:29.167 PM
3: Friday, May 11, 2018 4:37:31.167 PM

Теперь давайте рассмотрим этот случай:

.timeWindowAll(Time.seconds(10)) --> 2, 3

Временные окна выровнены почасы, а не события.В этом случае есть одно десятисекундное окно с 4:37:20 до 4:37:30, а другое с 4:37:30 до 4:37:40.Вот почему событие # 1 отброшено.Событие № 2, которое было обработано первым, установило водяной знак 4: 37: 30.167, и без допустимого опоздания событию № 1 некуда идти.Назначитель окна просто отбрасывает его.

С другой стороны, в этом случае

.timeWindowAll(Time.seconds(4)) --> 2, 1, 3

с окнами длиной 4 секунды все три события попадают в окно, которое начинается в 4:37:28.Событие # 1 все еще запаздывает, но окно, к которому оно относится, еще не очищено (и не будет, пока водяной знак не достигнет 4:37:32), поэтому событие # 1 включено в окно.

Для чего бы то ни было, типичная стратегия при работе с неупорядоченными событиями состоит в том, чтобы отрегулировать водяные знаки, чтобы, по крайней мере, учесть любое количество неупорядоченности, ожидаемое для вашего приложения (а не вообще никакого, как вы сделали здесь).).А оконный API Flink также поддерживает поздние события (то есть события настолько поздние, что задержка водяного знака была недостаточной).

Документы: Время события и водяные знаки , Поздность .

...