Flink WaterMark And Triggers - Поздние элементы не сбрасываются во время события? - PullRequest
0 голосов
/ 01 мая 2018

Меня несколько смущает то, как Флинк работает с поздними элементами, когда ставит водяные знаки на время события.

Насколько я понимаю, когда Flink читает поток данных, время водяного знака прогрессирует при просмотре любых данных, у которых время события больше, чем у текущего водяного знака. Затем любые окна, которые охватывают время, строго меньшее, чем водяной знак, запускаются для выселения (при условии, что допуск не опоздал.

Однако возьмем этот минимальный пример:

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}

object EventTimeExample {

  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)

  case class ExampleType(time: Long, value: Long)

  def main(args: Array[String]) {

    // Set up environment
    val env = StreamExecutionEnvironment.createLocalEnvironment(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // Example S3 path
    val simple = env.fromCollection(Seq(
      ExampleType(1525132800000L, 1),
      ExampleType(1525132800000L, 2) ,
      ExampleType(1525132920000L, 3),
      ExampleType(1525132800000L, 4)
    ))
      .assignAscendingTimestamps(_.time)

    val windows = simple
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
      .apply{
       (window, iter, collector: Collector[(Long, Long, String)]) => {
        collector.collect(window.getStart, window.getEnd, iter.map(_.value).toString())
      }
    }

    windows.print
    env.execute("TimeStampExample")
  }
}

Результат выполнения этого:

(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))

Однако, если мое понимание верно, 4 не следует включать в первое окно здесь, так как время водяного знака должно обновляться при достижении значения 3 record.

Теперь я понимаю, что это тривиальный пример, но непонимание этого затрудняет понимание более сложных потоков.

1 Ответ

0 голосов
/ 01 мая 2018

Ваше понимание в основном верно, но здесь происходит еще несколько вещей, которые необходимо учитывать.

Прежде всего, вы использовали assignAscendingTimestamps(), который можно использовать только тогда, когда поток событий находится в полном порядке (по метке времени), что здесь не так. Вы должны увидеть это предупреждение при запуске этого приложения:

WARN  org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor  - Timestamp monotony violated: 1525132800000 < 1525132920000

Другой фактор, работающий здесь, заключается в том, что AscendingTimestampExtractor не обновляет текущий водяной знак для каждого элемента проходящего потока. Это пример периодического генератора водяных знаков, и он будет вставлять Watermark в поток каждые n миллисекунд, где n определяется как ExecutionConfig.setAutoWatermarkInterval(...), по умолчанию 200 мсек. Вот как событие № 4 пробирается в первое окно.

Чтобы получить ожидаемые результаты, вы можете реализовать пунктуированный генератор водяных знаков, настроенный на создание водяного знака для каждого события:

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[ExampleType] {
  override def extractTimestamp(element: ExampleType, previousElementTimestamp: Long): Long = {
    element.time
  }

  override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
    new Watermark(extractedTimestamp)
  }
}

который вы затем использовали бы так:

val simple = env.fromCollection(Seq(
  ExampleType(1525132800000L, 1),
  ExampleType(1525132800000L, 2) ,
  ExampleType(1525132920000L, 3),
  ExampleType(1525132800000L, 4)
))
  .assignTimestampsAndWatermarks(new PunctuatedAssigner)

Теперь ваш пример дает следующие результаты:

(1525132800000,1525132860000,List(1, 2))
(1525132920000,1525132980000,List(3))

Событие № 4 было отброшено, потому что уже поздно. Это можно отрегулировать, ослабив генератор водяных знаков, чтобы учесть некоторую степень неупорядоченности. Например,

override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
  new Watermark(extractedTimestamp - 200000)
}

, который затем дает следующие результаты:

(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))

Или вы можете настроить окна для разрешения поздних событий

val windows = simple
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
  .allowedLateness(Time.seconds(200))
  ...

, после чего первое окно срабатывает дважды:

(1525132800000,1525132860000,List(1, 2))
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))

Обратите внимание, что, поскольку обработка водяных знаков накладывает определенные накладные расходы, обычно не требуется использовать акцентированные водяные знаки таким образом (с водяным знаком для каждого события). Для большинства применений периодические водяные знаки на основе BoundedOutOfOrdernessTimestampExtractor являются лучшим выбором.

...