Почему я получаю более одного окна на ключ в окне сеанса Flink - PullRequest
0 голосов
/ 29 мая 2019

Мои требования - проверить, начинаются ли события и события успеха в сеансе. Конечно, я использую сессионное окно. Но, похоже, в каждом ключе перекрываются окна. Я искал в Интернете и не мог знать почему.

Формат данных: myForm(timestamp, roomId, role, sessionId, event), например:

myform(1559128942, 123, kid, 37890, begin) # timestamp equals to 2019-05-29 19:22:22.605  
myform(1559128944, 123, kid, 37890, success) # timestamp equals to 2019-05-29 19:22:24.844  
myform(1559129977, 456, kid, 38239, begin) # timestamp equals to 2019-05-29 19:39:37  
...

сеанс может иметь только одну пару событий начала и успеха, также может быть несколько пар событий начала и успеха.
событие может прибыть поздно, и допускается до максимума 3 минуты.

Мой ключ roomId + role + sessionId как '123_kid_37890', seesionGap - 60 с

// use event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = ... // from kafka, steam of myform
val sessionStream = stream
    .assignTimestampsAndWatermarks(new MyFormEventWatermarks(0L))
    .keyBy(mf => mf.roomId + "_" + mf.role + "_" + mf.sessionId)
 .window(EventTimeSessionWindows.withGap(Time.milliseconds(60 * 1000L))      
.allowedLateness(Time.minutes(3))
.apply(myFormWindowFunction)

//MyFormEventWatermarks is :
class MyFormEventWatermarks[T <: AbstractForm](dely: Long) extends AssignerWithPeriodicWatermarks[T] {

  private var currentMaxTimestamp = Long.MinValue
  val maxOutOfOrderness = dely

  @transient
  var waterMark : Watermark = null

  override def getCurrentWatermark: Watermark = {
    if (currentMaxTimestamp == Long.MinValue){
      waterMark = new Watermark(Long.MinValue)
      waterMark
    }
    else{
      waterMark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
      waterMark
    }
  }

  override def extractTimestamp(data: T, previousElementTimestamp: Long): Long = {
    val timestamp = data.timestamp
    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
    timestamp
  }


}


//window func is 
class myFormWindowFunction extends RichWindowFunction ... {
    ...
    override def apply(key: String, window: TimeWindow, input: Iterable[myForm], out: Collector[List[myForm]]): Unit = {
        println("window is " + window.getStart() + "-" + window.getEnd() + "|" + data.tostring)

    }
    ...

}

В методе apply из myFormWindowFunction, результаты println как:

// like this session data:
myform(1559128942, 123, kid, 37890, begin) # timestamp equals to 2019-05-29 19:22:22.605  
myform(1559128944, 123, kid, 37890, success) # timestamp equals to 2019-05-29 19:22:24.844  

У меня есть окно 2019-05-29 19:22:22.605- 2019-05-29 19:23:22.605, данные myform(1559128942, 123, kid, 37890, begin), затем я получил второе окно 2019-05-29 19:22:22.605 - 2019-05-29 19:23:24.844 и данные myform(1559128942, 123, kid, 37890, begin), myform(1559128944, 123, kid, 37890, success). Похоже, окно инициализируется до (2019-05-29 19: 22: 22.605, 2019-05-29 19: 23: 22.605) и (2019-05-29 19: 22: 24.844, 2019-05-29 19:23 : 24.844) и метод onMerge объединены, но не «пропущены» (2019-05-29 19: 22: 22.605, 2019-05-29 19: 23: 22.605). Я посмотрел функцию источника EventTimeSessionWindows и примеры окна сеанса flink, и до сих пор не знаю, где программа работает неправильно?

Ответы [ 2 ]

0 голосов
/ 31 мая 2019

Поведение по умолчанию для событий, назначенных окну в течение разрешенного интервала задержек, состоит в том, чтобы запускать окно при добавлении каждого позднего события в окно - но также возможно реализовать пользовательский триггер, который срабатывает по истечении допустимого опоздание, либо вместо, либо в дополнение к другим выстрелам.

Обратите внимание, что в случае окон сеанса событие, которое приходит поздно, может вызвать позднее слияние.

Вы можете подумать о компромиссах между задержкой нанесения водяного знака и допустимой задержкой. Поскольку задержка вашего водяного знака равна нулю, вы, вероятно, будете иметь довольно много поздних событий (каждый раз, когда поток событий не в порядке по отметке времени). Например, если вместо этого вы используете 3 минуты в качестве задержки водяного знака и установите допустимое время задержки равным нулю, то вы получите те же окончательные результаты, но без поздних срабатываний и поздних слияний - но с задержкой в ​​3 минуты до начального запуск каждого окна.

0 голосов
/ 30 мая 2019

Я обнаружил проблему, я неправильно понял допустимый опоздание.Когда оно используется, окно сохраняется, а когда наступает время window + allowLateness, окно снова запускается.

...