У меня есть временное окно, которое я пытаюсь определить, получу ли я новый ключ в течение определенного периода времени. Я отправляю данные через kafka, и когда я отлаживаю их, я вижу, что данные попадают в метод keyby
, но не достигают метода process
и не собираются сборщиком. Я использую BoundedOutOfOrdernessTimestampExtractor
для назначения водяных знаков:
case class Src(qip:Ip, ref: Ip, ts: Long) extends FooRequest
class TsExtractor extends BoundedOutOfOrdernessTimestampExtractor[Src](Time.hours(3)){
override def extractTimestamp(element: Src): Long = element.ts
}
class RefFilter extends ProcessWindowFunction[Src, IpDetectionSrc, String, TimeWindow]{
private lazy val stateDescriptor = new ValueStateDescriptor("refFilter", createTypeInformation[String])
override def process(key: String, context: Context, elements: Iterable[Src], out: Collector[IpDetectionSrc]): Unit = {
println(s"RefIpFilter processing $key")//data is not getting here
if(Option(context.windowState.getState(stateDescriptor).value()).isEmpty){
println(s"new key found $key") //data is not getting here also
context.windowState.getState(stateDescriptor).update(key)
out.collect(elements.head)
}
}
}
lazy val env: StreamExecutionEnvironment =
setupEnv(StreamExecutionEnvironment.getExecutionEnvironment)(300000,Some(stateDir), Some(TimeCharacteristic.EventTime))
lazy val src: DataStream[FooRequest] = env.addSource(consumer)
lazy val uniqueRef:DataStream[FooRequest] => DataStream[Src] = src => src
.flatMap(new FlatMapFunction[FooRequest,Src ]{
override def flatMap(value: FooRequest, out: Collector[Src]): Unit = value match {
case r: Src =>
out.collect(r)
case invalid =>
log.warn(s"filtered unexpected request $invalid")
}
})
.assignTimestampsAndWatermarks(new TsExtractor)
.keyBy(r => r.ref)
.timeWindow(Time.seconds(120))
.allowedLateness(Time.seconds(360))
.process(new RefFilter)
uniqueRef(src).addSink(sink)
env.execute()
любая помощь будет принята с благодарностью