данные flink не обрабатываются функцией процесса в операторе временного окна - PullRequest
0 голосов
/ 19 февраля 2020

У меня есть временное окно, которое я пытаюсь определить, получу ли я новый ключ в течение определенного периода времени. Я отправляю данные через kafka, и когда я отлаживаю их, я вижу, что данные попадают в метод keyby, но не достигают метода process и не собираются сборщиком. Я использую BoundedOutOfOrdernessTimestampExtractor для назначения водяных знаков:

    case class Src(qip:Ip, ref: Ip, ts: Long) extends FooRequest

    class TsExtractor extends BoundedOutOfOrdernessTimestampExtractor[Src](Time.hours(3)){
      override def extractTimestamp(element: Src): Long = element.ts
    }

    class RefFilter extends ProcessWindowFunction[Src, IpDetectionSrc, String, TimeWindow]{
      private lazy val stateDescriptor = new ValueStateDescriptor("refFilter",  createTypeInformation[String])

      override def process(key: String, context: Context, elements: Iterable[Src], out: Collector[IpDetectionSrc]): Unit = {
        println(s"RefIpFilter processing $key")//data is not getting here 
        if(Option(context.windowState.getState(stateDescriptor).value()).isEmpty){
          println(s"new key found $key") //data is not getting here also 
          context.windowState.getState(stateDescriptor).update(key)
          out.collect(elements.head)
        }
      }
    }

lazy val env: StreamExecutionEnvironment =
    setupEnv(StreamExecutionEnvironment.getExecutionEnvironment)(300000,Some(stateDir), Some(TimeCharacteristic.EventTime))

 lazy val src: DataStream[FooRequest] = env.addSource(consumer)

 lazy val uniqueRef:DataStream[FooRequest] => DataStream[Src] = src => src 
        .flatMap(new FlatMapFunction[FooRequest,Src ]{
          override def flatMap(value: FooRequest, out: Collector[Src]): Unit =   value match {
            case r: Src =>
              out.collect(r)
            case invalid =>
              log.warn(s"filtered unexpected request $invalid")
          }
        })
        .assignTimestampsAndWatermarks(new TsExtractor)
        .keyBy(r => r.ref)
        .timeWindow(Time.seconds(120))
        .allowedLateness(Time.seconds(360))
        .process(new RefFilter)

uniqueRef(src).addSink(sink)
env.execute()

любая помощь будет принята с благодарностью

1 Ответ

0 голосов
/ 20 февраля 2020

BoundedOutOfOrdernessTimestampExtractor отслеживает наибольшую отметку времени, которую он когда-либо видел, и создает водяные знаки, которые следуют за этим из-за заданной задержки (в данном случае три часа). Эти водяные знаки создаются периодически, каждые 200 мсек c по умолчанию. Таким образом, при наличии только одного события водяной знак будет на 3 часа позже этого события, и окно никогда не сработает. Кроме того, при ограниченном вводе задание будет остановлено после обработки всех событий.

context.windowState - это состояние для каждого окна с ограниченным сроком службы. Каждое 2-минутное окно будет иметь свой собственный экземпляр, и оно очищается после истечения допустимого времени ожидания для окна. Если вы хотите, чтобы состояние окна с ключами было глобальным, с неопределенным временем жизни, используйте context.globalState.

...