Мои требования - проверить, начинаются ли события и события успеха в сеансе. Конечно, я использую сессионное окно. Но, похоже, в каждом ключе перекрываются окна. Я искал в Интернете и не мог знать почему.
Формат данных: myForm(timestamp, roomId, role, sessionId, event)
, например:
myform(1559128942, 123, kid, 37890, begin) # timestamp equals to 2019-05-29 19:22:22.605
myform(1559128944, 123, kid, 37890, success) # timestamp equals to 2019-05-29 19:22:24.844
myform(1559129977, 456, kid, 38239, begin) # timestamp equals to 2019-05-29 19:39:37
...
сеанс может иметь только одну пару событий начала и успеха, также может быть несколько пар событий начала и успеха.
событие может прибыть поздно, и допускается до максимума 3 минуты.
Мой ключ roomId
+ role
+ sessionId
как '123_kid_37890', seesionGap
- 60 с
// use event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = ... // from kafka, steam of myform
val sessionStream = stream
.assignTimestampsAndWatermarks(new MyFormEventWatermarks(0L))
.keyBy(mf => mf.roomId + "_" + mf.role + "_" + mf.sessionId)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(60 * 1000L))
.allowedLateness(Time.minutes(3))
.apply(myFormWindowFunction)
//MyFormEventWatermarks is :
class MyFormEventWatermarks[T <: AbstractForm](dely: Long) extends AssignerWithPeriodicWatermarks[T] {
private var currentMaxTimestamp = Long.MinValue
val maxOutOfOrderness = dely
@transient
var waterMark : Watermark = null
override def getCurrentWatermark: Watermark = {
if (currentMaxTimestamp == Long.MinValue){
waterMark = new Watermark(Long.MinValue)
waterMark
}
else{
waterMark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
waterMark
}
}
override def extractTimestamp(data: T, previousElementTimestamp: Long): Long = {
val timestamp = data.timestamp
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
timestamp
}
}
//window func is
class myFormWindowFunction extends RichWindowFunction ... {
...
override def apply(key: String, window: TimeWindow, input: Iterable[myForm], out: Collector[List[myForm]]): Unit = {
println("window is " + window.getStart() + "-" + window.getEnd() + "|" + data.tostring)
}
...
}
В методе apply
из myFormWindowFunction
, результаты println
как:
// like this session data:
myform(1559128942, 123, kid, 37890, begin) # timestamp equals to 2019-05-29 19:22:22.605
myform(1559128944, 123, kid, 37890, success) # timestamp equals to 2019-05-29 19:22:24.844
У меня есть окно 2019-05-29 19:22:22.605- 2019-05-29 19:23:22.605
, данные myform(1559128942, 123, kid, 37890, begin)
, затем я получил второе окно 2019-05-29 19:22:22.605 - 2019-05-29 19:23:24.844
и данные myform(1559128942, 123, kid, 37890, begin), myform(1559128944, 123, kid, 37890, success)
.
Похоже, окно инициализируется до (2019-05-29 19: 22: 22.605, 2019-05-29 19: 23: 22.605) и (2019-05-29 19: 22: 24.844, 2019-05-29 19:23 : 24.844) и метод onMerge
объединены, но не «пропущены» (2019-05-29 19: 22: 22.605, 2019-05-29 19: 23: 22.605). Я посмотрел функцию источника EventTimeSessionWindows
и примеры окна сеанса flink, и до сих пор не знаю, где программа работает неправильно?