Может случиться, что данные, поступающие в задание Flink, вызывают исключение либо из-за ошибки в коде, либо из-за отсутствия проверки.Моя цель - обеспечить согласованный способ обработки исключений, который наша команда могла бы использовать в заданиях Flink, что не вызовет простоев в работе.
Стратегии перезапуска здесь не применимы, так как:
- простой перезапуск не решит проблему, и мы попадаем в цикл перезапуска
- мы не можем просто пропустить событие
- они могут быть полезны для OOME или некоторых временных проблем
- мы не можем добавить пользовательский
блок try / catch в функции "keyBy" не полностью помогает, так как:
- нет способа пропустить событие в «keyBy» после обработки исключения
Пример кода:
env.addSource(kafkaConsumer)
.keyBy(keySelector) // must return one result for one entry
.flatMap(mapFunction) // we can skip some entries here in case of errors
.addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
Я хотел бы иметь возможность пропуститьобработка события, вызвавшего проблему в «keyBy» и аналогичных методах, которые должны возвращать ровно один результат.