Apache Flink - обработка исключений в "keyBy" - PullRequest
1 голос
/ 02 апреля 2019

Может случиться, что данные, поступающие в задание Flink, вызывают исключение либо из-за ошибки в коде, либо из-за отсутствия проверки.Моя цель - обеспечить согласованный способ обработки исключений, который наша команда могла бы использовать в заданиях Flink, что не вызовет простоев в работе.

  1. Стратегии перезапуска здесь не применимы, так как:

    • простой перезапуск не решит проблему, и мы попадаем в цикл перезапуска
    • мы не можем просто пропустить событие
    • они могут быть полезны для OOME или некоторых временных проблем
    • мы не можем добавить пользовательский
  2. блок try / catch в функции "keyBy" не полностью помогает, так как:

    • нет способа пропустить событие в «keyBy» после обработки исключения

Пример кода:

env.addSource(kafkaConsumer)
    .keyBy(keySelector) // must return one result for one entry
    .flatMap(mapFunction) // we can skip some entries here in case of errors
    .addSink(new PrintSinkFunction<>());
env.execute("Flink Application");

Я хотел бы иметь возможность пропуститьобработка события, вызвавшего проблему в «keyBy» и аналогичных методах, которые должны возвращать ровно один результат.

Ответы [ 2 ]

1 голос
/ 02 апреля 2019

Кроме предложения @ phanhuy152 (что мне кажется вполне законным), почему бы не filter до keyBy?

env.addSource(kafkaConsumer)
    .filter(invalidKeys)
    .keyBy(keySelector) // must return one result for one entry
    .flatMap(mapFunction) // we can skip some entries here in case of errors
    .addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
1 голос
/ 02 апреля 2019

Можете ли вы зарезервировать специальное значение, например, "NULL", для keyBy для возврата в таком случае?Тогда ваша flatMap функция может пропустить, когда встретите такое значение?

...