Я использую потребитель Kafka с Flink 1.9 (в Scala 2.12) и сталкиваюсь со следующей проблемой (аналогично этот вопрос ): потребитель должен прекратить получать данные (и завершить задачу), когда нетновые сообщения принимаются в течение определенного промежутка времени (поскольку поток потенциально бесконечен, поэтому в самой теме нет сообщения об окончании потока).
Я пытался использовать ProcessFunction
который вызывает consumer.close()
, но это не помогло (потребитель продолжает работать). Создание исключения в ProcessFunction
полностью убивает задание, а это не то, чего я хочу (поскольку задание состоит из нескольких этапов, которые отменяются после создания исключения). Вот моя ProcessFunction:
class TimeOutFunction( // delay after which an alert flag is thrown
val timeOut: Long, consumer: FlinkKafkaConsumer[Row]
) extends ProcessFunction[Row, Row] {
// state to remember the last timer set
private var lastTimer: ValueState[Long] = _
override def open(conf: Configuration): Unit = { // setup timer state
val lastTimerDesc = new ValueStateDescriptor[Long]("lastTimer", classOf[Long])
lastTimer = getRuntimeContext.getState(lastTimerDesc)
}
override def processElement(value: Row, ctx: ProcessFunction[Row, Row]#Context, out: Collector[Row]): Unit = { // get current time and compute timeout time
val currentTime = ctx.timerService.currentProcessingTime
val timeoutTime = currentTime + timeOut
// register timer for timeout time
ctx.timerService.registerProcessingTimeTimer(timeoutTime)
// remember timeout time
lastTimer.update(timeoutTime)
// throughput the event
out.collect(value)
}
override def onTimer(timestamp: Long, ctx: ProcessFunction[Row, Row]#OnTimerContext, out: Collector[Row]): Unit = {
// check if this was the last timer we registered
if (timestamp == lastTimer.value) {
// it was, so no data was received afterwards.
// stop the consumer.
consumer.close()
}
}
}
Метод isEndOfStream()
в схеме десериализации также бесполезен, поскольку требует nextElement
(а мой случай вроде наоборот, так как поток должен остановитьсякогда следующего элемента не будет в течение некоторого времени).
Итак, есть способ сделать это (желательно без подкласса FlinkKafkaConsumer
и / или с использованием отражения)?