Как обрабатывать исключения в Apache flink KeyedBroadCastProcessFunction - PullRequest
1 голос
/ 10 июля 2020

Я новичок в Flink, я выполняю оценку шаблона с помощью Flink KeyedBroadCastProcessFunction, что-то похожее на (https://flink.apache.org/2019/06/26/broadcast-state.html), и я использую JAVA для разработки своего кода, но я не получаю как я могу обработать исключение, если во время обработки потока данных произошел какой-либо сбой, я много искал, но не получил Я оказался ниже двух ссылок

Flink: как лучше всего обрабатывать исключения внутри заданий Flink

Apache Flink - обработка исключений в «keyBy»

Согласно первой ссылке, пользователь сказал, что использует побочный вывод в processfn для фиксации ошибок. Я также использую побочный вывод в моей программе для отправки данных, которые не соответствуют шаблонам, но я не понял, как обрабатывать ошибки и недопустимые данные на одном и том же боковом выходе.

Согласно второй ссылке, которую пользователь пытается добавить перейти к функции keyby с нулевым ключом и функцией printtsink, которую я вообще не понимал

Может ли кто-нибудь помочь мне с вещами ниже

* 10 19 * 1) Любая документация или небольшой фрагмент кода для обработки исключений. Я ничего не нашел на сайте документации flink. 2) Рекомендации по обработке исключений flink

Я не нашел никаких действующих ресурсов в Интернете, если кто-то может ответить, это будет полезно для дальнейших ссылок на другие также

1 Ответ

0 голосов
/ 10 июля 2020

Вы можете иметь столько побочных выходов из ProcessFunction, сколько хотите - каждый будет иметь свой уникальный OutputTag. Таким образом, вы можете использовать один для несопоставленных данных, а другой - для ошибок. может выглядеть примерно так:

final OutputTag<String> errors = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<T> task1 = ...;
SingleOutputStreamOperator<T> task2 = ...;
SingleOutputStreamOperator<T> task3 = ...;

DataStream<String> exceptions1 = task1.getSideOutput(errors);
DataStream<String> exceptions2 = task2.getSideOutput(errors);
DataStream<String> exceptions3 = task3.getSideOutput(errors);

DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);

exceptions.addSink(new FlinkKafkaProducer(...));
...