Возможные варианты:
A) Вы можете просто использовать ProcessFunction
с таймером и извлекать изменения каждые X минут.
B) Если ваше состояние небольшое и перезапуски не Слишком критично: ваш сервер запрашивает сбои, если вы не обновите свои данные (403?). Тогда вы можете просто загрузить данные в open
и вызвать оператора, когда вы получите 403 с и восстановитесь.
edit:
Пример того, как A) может работать. Предполагая, что у вас есть
Источник (Запись) -> MyAsyncFun c (Вывод) -> Мойка (Вывод)
Я бы go и добавил еще одну функцию
Источник (Запись) -> ConfFetcher (Tuple2 (Запись, Conf)) -> MyAsyncFun c (Вывод) -> Sink (Вывод)
edit2:
Как вы указали в комментирует таймер Flink, связанный с состоянием ключа. Однако для этого случая использования нам вообще не нужно использовать какой-либо таймер Flink, а просто используйте Java Timers.
private static class PullConfig<T> extends RichMapFunction<T, Tuple2<T, Conf>> {
private transient ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
private transient volatile Conf conf;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
service.scheduleWithFixedDelay(this::pullConfig, 0, 1, TimeUnit.HOURS);
}
void pullConfig() {
conf = ...
}
@Override
public Tuple2<T, Conf> map(T value) throws Exception {
return new Tuple2(value, conf);
}
...
}