Flink инкрементная контрольная точка - как долго данные хранятся в папке общего доступа - PullRequest
1 голос
/ 07 апреля 2020

Мы используем Flink 1.6.3 и сохраняем контрольную точку в CEPH, сохраняя только одну контрольную точку за раз, используя инкрементную и использующую пороги.

Мы запускаем windows с опозданием на 3 дня, что означает что мы ожидаем, что никакие данные в общей папке контрольных точек не будут сохраняться через 3-4 дня. Тем не менее, мы видим, что есть данные из более чем
например
Если сегодня 7/4, то есть некоторые файлы из 2/4

Иногда мы видим контрольные точки, которые мы предполагаем (из-за того, что его индексный номер не синхронизирован), что он принадлежит заданию, которое уничтожено, и контрольная точка не использовалась для восстановления задания

Мои вопросы:

  • Почему мы видим данные, более старые из конфигурации запаздывания
  • Как узнать, что файлы принадлежат действительной контрольной точке, а не контрольной точке сломанная работа - поэтому мы можем удалить эти файлы

enter image description here

1 Ответ

1 голос
/ 10 мая 2020

После расследования и с помощью Юнь Тана (apache -flink-user-mailing-list)
я создал следующий код
metadataPath - путь к файлу _metadata, который находится в папке контрольной точки / точки сохранения
Это было проверено на версии Flink 1.6.3

        DataInputStream in = new DataInputStream(new FileInputStream(metadataPath));
        final Savepoint savepoint = Checkpoints.loadCheckpointMetadata(in, CheckpointTool.class.getClassLoader());

        final Set<String> pathSharedFromMetadata = savepoint.getOperatorStates().stream()
                .flatMap(operatorState -> operatorState.getSubtaskStates().values().stream()
                        .flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream()
                                .flatMap(keyedStateHandle -> Stream.concat(((IncrementalKeyedStateHandle) keyedStateHandle).getSharedState().values().stream(),
                                        ((IncrementalKeyedStateHandle) keyedStateHandle).getPrivateState().values().stream())
                                        .map(streamStateHandle -> {
                                            String name = null;
                                            if (streamStateHandle instanceof FileStateHandle) {
                                                name = ((FileStateHandle) streamStateHandle).getFilePath().getName();
                                            } else {
                                                final String handleName = ((ByteStreamStateHandle) streamStateHandle).getHandleName();
                                                name = new File(handleName).getName();
                                            }
                                            return name.trim();

                                        })
                                )
                        )
                )
                .collect(Collectors.toSet());
        System.out.println("pathSharedFromMetadata:" + pathSharedFromMetadata); 
...