Стоп Flink Kafka потребительская задача программно - PullRequest
2 голосов
/ 08 октября 2019

Я использую потребитель Kafka с Flink 1.9 (в Scala 2.12) и сталкиваюсь со следующей проблемой (аналогично этот вопрос ): потребитель должен прекратить получать данные (и завершить задачу), когда нетновые сообщения принимаются в течение определенного промежутка времени (поскольку поток потенциально бесконечен, поэтому в самой теме нет сообщения об окончании потока).

Я пытался использовать ProcessFunction который вызывает consumer.close(), но это не помогло (потребитель продолжает работать). Создание исключения в ProcessFunction полностью убивает задание, а это не то, чего я хочу (поскольку задание состоит из нескольких этапов, которые отменяются после создания исключения). Вот моя ProcessFunction:

class TimeOutFunction( // delay after which an alert flag is thrown
  val timeOut: Long, consumer: FlinkKafkaConsumer[Row]
) extends ProcessFunction[Row, Row] {
  // state to remember the last timer set
  private var lastTimer: ValueState[Long] = _

  override def open(conf: Configuration): Unit = { // setup timer state
    val lastTimerDesc = new ValueStateDescriptor[Long]("lastTimer", classOf[Long])
    lastTimer = getRuntimeContext.getState(lastTimerDesc)
  }

  override def processElement(value: Row, ctx: ProcessFunction[Row, Row]#Context, out: Collector[Row]): Unit = { // get current time and compute timeout time
    val currentTime = ctx.timerService.currentProcessingTime
    val timeoutTime = currentTime + timeOut
    // register timer for timeout time
    ctx.timerService.registerProcessingTimeTimer(timeoutTime)
    // remember timeout time
    lastTimer.update(timeoutTime)
    // throughput the event
    out.collect(value)
  }

  override def onTimer(timestamp: Long, ctx: ProcessFunction[Row, Row]#OnTimerContext, out: Collector[Row]): Unit = {
    // check if this was the last timer we registered
    if (timestamp == lastTimer.value) {
      // it was, so no data was received afterwards.
      // stop the consumer.
      consumer.close()
    }
  }
}

Метод isEndOfStream() в схеме десериализации также бесполезен, поскольку требует nextElement (а мой случай вроде наоборот, так как поток должен остановитьсякогда следующего элемента не будет в течение некоторого времени).

Итак, есть способ сделать это (желательно без подкласса FlinkKafkaConsumer и / или с использованием отражения)?

...