Apache Flink - обновить конфигурацию внутри оператора без использования состояния широковещания - PullRequest
0 голосов
/ 20 января 2020

Мы используем flink для выполнения http-вызовов для каждого события, и для этого требуются определенные данные, которые хранятся в БД. Эти данные обновляются примерно раз в неделю. Это обновление должно go для оператора.

Есть ли способ обновить эти данные внутри оператора без использования широковещательного потока, так как мы пытаемся сохранить количество потоков низким в нашей архитектуре, а также потому, что изменения в данных являются частыми?

1 Ответ

1 голос
/ 20 января 2020

Возможные варианты:

A) Вы можете просто использовать ProcessFunction с таймером и извлекать изменения каждые X минут.

B) Если ваше состояние небольшое и перезапуски не Слишком критично: ваш сервер запрашивает сбои, если вы не обновите свои данные (403?). Тогда вы можете просто загрузить данные в open и вызвать оператора, когда вы получите 403 с и восстановитесь.

edit:

Пример того, как A) может работать. Предполагая, что у вас есть

Источник (Запись) -> MyAsyncFun c (Вывод) -> Мойка (Вывод)

Я бы go и добавил еще одну функцию

Источник (Запись) -> ConfFetcher (Tuple2 (Запись, Conf)) -> MyAsyncFun c (Вывод) -> Sink (Вывод)

edit2:

Как вы указали в комментирует таймер Flink, связанный с состоянием ключа. Однако для этого случая использования нам вообще не нужно использовать какой-либо таймер Flink, а просто используйте Java Timers.

private static class PullConfig<T> extends RichMapFunction<T, Tuple2<T, Conf>> {
    private transient ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
    private transient volatile Conf conf;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        service.scheduleWithFixedDelay(this::pullConfig, 0, 1, TimeUnit.HOURS);
    }

    void pullConfig() {
        conf = ...
    }


    @Override
    public Tuple2<T, Conf> map(T value) throws Exception {
        return new Tuple2(value, conf);
    }
    ...
}
...