При взаимодействии с внешними системами я бы рекомендовал использовать Flink's asyn c Оператор ввода-вывода . Он позволяет вам выполнять асинхронные задачи, не блокируя выполнение оператора.
Если вы хотите повторить неудачные операции, не перезапуская задание Flink с последней успешной контрольной точки, я бы предложил реализовать политику повторных попыток самостоятельно. Это может выглядеть следующим образом:
new AsyncFunction<IN, OUT>() {
@Override
public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
FutureUtils
.retrySuccessfulWithDelay(
() -> triggerAsyncOperation(input),
Time.seconds(1L),
Deadline.fromNow(Duration.ofSeconds(10L)),
this::decideWhetherToRetry,
new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()))
.whenComplete((result, throwable) -> {
if (result != null) {
resultFuture.complete(Collections.singleton(result));
} else {
resultFuture.completeExceptionally(throwable);
}
})
}
}
с triggerAsyncOperation
, инкапсулирующим вашу асинхронную операцию, и decideWhetherToRetry
, инкапсулирующим вашу стратегию повторения. Если decideWhetherToRetry
возвращает true
, то resultFuture
будет завершено со значением этой попытки операции.
Если resultFuture
выполнено исключительно, то это вызовет аварийное переключение, которое приведет к выполнению задания перезапустите с этой последней успешной контрольной точки.