Почему flink 1.10.1 не загружает сохраненные состояния после flink cra sh -restart with FsStateBackend - PullRequest
0 голосов
/ 09 июля 2020

Я использую Flink 1.10.1 с FsStateBackend в качестве бэкэнда состояния для контрольных точек. У меня есть несколько операций с отслеживанием состояния, которые во время работы приложения (запущенного как приложение .jar, а не как кластер) работают должным образом, но если приложение останавливается (или выходит из строя) по какой-либо причине, состояния, которые должны храниться в файловой системе с контрольные точки не загружены, а функции не имеют предыдущих ссылок, тогда мне нужно загрузить информацию из базы данных и сохранить ее как состояние, чтобы снова работать с этими предыдущими состояниями. Должен быть способ сделать это с помощью контрольных точек и FsStateBackend без необходимости читать всю информацию из базы данных, просто перезагрузите эти состояния из уже сохраненных контрольных точек. Возможно ли это?

Вот код: Моя конфигурация контрольной точки

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8, GetConfiguration.getConfig());
final StateBackend stateBackend = new FsStateBackend(new Path("/some/path/checkpoints").toUri(), true);
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setCheckpointTimeout(60000);
            env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
            env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
            env.setRestartStrategy(RestartStrategies.noRestart());
            env.setStateBackend(stateBackend);

и это пример, которого я хочу избежать:

public class EventCountMap extends RichMapFunction<Event, EventCounter> {
    private static final MapStateDescriptor<String, Timestamp> descriptor = new MapStateDescriptor<>("previous_counter", String.class, Timestamp.class);
    private static final EventCounter eventCounter = new EventCounter();
    private MapState<String, Timestamp> previous_state;
    private static final StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(org.apache.flink.api.common.time.Time.days(1))
            .cleanupFullSnapshot()
            .build();

    @Override
    public void open(Configuration parameters) {
        descriptor.enableTimeToLive(ttlConfig);
        previous_state = getRuntimeContext().getMapState(descriptor);
    }

/*I want to avoid to call this function that load all events from db and pass them to the state to be used. This happens only once but there must be a efficient way to do this in flink.*/
    private void mapRefueled() throws Exception {
        Preconditions.checkNotNull(previous_state);
        for (Map.Entry<String, Timestamp> map : StreamingJob.update_beh_count_ts.entrySet())
            previous_state.put(map.getKey(), map.getValue());
        StreamingJob.update_beh_count_ts.clear();
    }

    @Override
    public EventCounter map(Event event) throws Exception {
        /*Refuel map state in case of failures*/
        if (!StreamingJob.update_beh_count_ts.isEmpty()) mapRefueled();
        eventCounter.date = new Date(event.timestamp.getTime());
        final String key_first = eventCounter.date.toString().concat("_ts_first");
        final String key_last = eventCounter.date.toString().concat("_ts_last");
        if (previous_state.contains(key_first) && previous_state.contains(key_last)) {
            final Timestamp first = (previous_state.get(key_first).after(event.timestamp)) ? event.timestamp : previous_state.get(key_first);
            final Timestamp last = (previous_state.get(key_last).before(event.timestamp)) ? event.timestamp : previous_state.get(key_last);
            previous_state.put(key_first, first);
            previous_state.put(key_last, last);
        } else {
            previous_state.put(key_first, event.timestamp);
            previous_state.put(key_last, event.timestamp);
        }
        eventCounter.first_event = previous_state.get(key_first);
        eventCounter.last_event = previous_state.get(key_last);
        return eventCounter;
    }
}

Надеюсь, я смогу объяснись, чтобы ты понял, что мне нужно делать. С уважением! Заранее спасибо.

1 Ответ

0 голосов
/ 09 июля 2020

Чтобы состояние из контрольной точки было загружено при перезапуске задания, вы должны явно организовать это, иначе задание будет перезапущено без загрузки контрольной точки.

См. Восстановить из точки сохранения и Продолжение с сохраненной контрольной точки для подробностей, но суть такова:

bin/flink run -s :checkpointPath [:runArgs]

Я предполагаю, что этого не делается.

С точки зрения передового опыта настройки кластера Flink для автоматического c восстановления, это зависит от того, что вы используете (Yarn, Mesos, Kubernetes, ...).

...