Как пропустить испорченные сообщения в Flink
У меня есть DAG: KafkaSrcConsumer> FlatMap> Window> SinkFunction
Теперь, если я получаю поврежденное сообщение от Кафки в операторе "KafkaSrcConsumer", я хочу выбросить / пропустить это сообщение и не хочу пересылать это поврежденное сообщение следующему оператору "FlatMap"
Как мы можем достичь этого в Apache Flink?
(Примечание: создание исключения из KafkaSrcConsumer перезапустит задание flink, и я хочу избежать этого, так как я просто хочу пропустить сообщение и перейти к следующему сообщению)