Как обрабатывать временные сбои / сбои приложений в Apache Flink? - PullRequest
0 голосов
/ 31 марта 2020

Мой процессор Flink прослушивает Kafka, и бизнес-логика c в процессоре включает вызов внешних служб REST, и есть вероятность, что эти службы могут быть недоступны. Я хотел бы воспроизвести кортеж обратно в процессор, и есть ли способ сделать это? Я использовал Storm, и мы сможем отключить кортеж, чтобы он не был подтвержден. Таким образом, тот же кортеж будет воспроизведен на процессоре.

В Flink кортеж автоматически подтверждается после того, как сообщение получено Flink-Kafka Consumer. Есть способы решить эту проблему. Один из таких способов - опубликовать sh сообщение обратно в ту же очередь / очередь повторов. Но я ищу решение, подобное Storm.

Я знаю, что точка сохранения / контрольная точка Флинка будет использоваться для отказоустойчивости. Но в моем понимании, кортежи будут воспроизведены в случае неудачи Флинка. Я хотел бы получить идеи о том, как обрабатывать переходные сбои.

Спасибо

1 Ответ

0 голосов
/ 03 апреля 2020

При взаимодействии с внешними системами я бы рекомендовал использовать Flink's asyn c Оператор ввода-вывода . Он позволяет вам выполнять асинхронные задачи, не блокируя выполнение оператора.

Если вы хотите повторить неудачные операции, не перезапуская задание Flink с последней успешной контрольной точки, я бы предложил реализовать политику повторных попыток самостоятельно. Это может выглядеть следующим образом:

new AsyncFunction<IN, OUT>() {
    @Override
    public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        FutureUtils
            .retrySuccessfulWithDelay(
                () -> triggerAsyncOperation(input),
                Time.seconds(1L),
                Deadline.fromNow(Duration.ofSeconds(10L)),
                this::decideWhetherToRetry,
                new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()))
            .whenComplete((result, throwable) -> {
                if (result != null) {
                    resultFuture.complete(Collections.singleton(result));
                } else {
                    resultFuture.completeExceptionally(throwable);
                }
            })
    }
}

с triggerAsyncOperation, инкапсулирующим вашу асинхронную операцию, и decideWhetherToRetry, инкапсулирующим вашу стратегию повторения. Если decideWhetherToRetry возвращает true, то resultFuture будет завершено со значением этой попытки операции.

Если resultFuture выполнено исключительно, то это вызовет аварийное переключение, которое приведет к выполнению задания перезапустите с этой последней успешной контрольной точки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...