Обработка исключений глобально для конвейера данных - PullRequest
0 голосов
/ 17 января 2019

У меня есть конвейер данных 5 различных задач. если в какой-либо задаче есть какое-либо исключение, то перенесите его в раздел ошибок kafka. есть ли обработчик исключений

1 Ответ

0 голосов
/ 17 января 2019

Я бы порекомендовал использовать функцию побочного вывода Flink для сбора исключений и их вывода в тему Kafka.

final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;
DataStream<String> exceptions1 = task1.getSideOutput(outputTag);
DataStream<String> exceptions2 = task2.getSideOutput(outputTag);
DataStream<String> exceptions3 = task3.getSideOutput(outputTag);

DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));

Обновление

Вы также можете заключить свой результат в Left и исключения в Right типа Either. В конце вашего конвейера вам нужно разделить поток на полезную нагрузку и исключения с помощью функции split/select.

DataStream<Either<Payload, Exception>> stage2 = stage1.flatMap(...);
DataStream<Either<Payload2, Exception>> stage3 = stage2.flatMap((Either<Payload, Exception> payload, Collector out) -> {
    if (payload.isLeft()) {
        out.collect(Left.of(map(payload.left)));
    } else {
        out.collect(Right.of(payload.right()));
    }   
});

SplitStream<Either<Payload2, Exception>> split = stage3.split((Either<Payload2, Exception> value) -> {
    if (value.isLeft()) {
        return Colletions.singleton("left");
    } else {
        return Collections.singleton("right");
    }
});

DataStream<Either<Payload2, Exception>> payloads = split.select("left");
DataStream<Either<Payload2, Exception>> exceptions = split.select("right");
...