Как десериализовать внешний манифест контрольной точки во Flink? - PullRequest
0 голосов
/ 23 мая 2019

Я использую Flink 1.4.2 с инкрементными контрольными точками с RocksDB и сохраняю контрольные точки в корзину S3. Структура контрольной точки - это файл манифеста, который указывает на некоторые файлы, содержащие состояние. Когда я открываю файл манифеста в текстовом редакторе, я вижу нечитаемые куски и некоторые URL-адреса s3.

Как можно десериализовать этот файл манифеста, чтобы получить список URL-адресов S3?

Ответы [ 2 ]

1 голос
/ 23 мая 2019

Оба https://github.com/king/bravo и https://github.com/sjwiesman/flink/tree/savepoint-connector содержат разъемы, которые могут считывать и записывать точки сохранения / контрольные точки.Вы можете найти один или оба полезных, либо напрямую, либо в качестве примера.Посмотрите этот билет Jira - https://issues.apache.org/jira/browse/FLINK-12047 - чтобы следить за текущей работой, чтобы создать лучшие инструменты для работы со снимками Flink.

0 голосов
/ 23 мая 2019

Класс SavepointStore формы Apache Flink Runtime Library содержит методы для хранения и загрузки точек сохранения.

Только для моего текущего сценария я создал этот фрагмент для извлечения файлов, связанных с контрольной точкой.

import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;

import java.io.IOException;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CheckpointFileLocator {

    public static void main(String[] args) throws IOException {
        System.out.println(new CheckpointFileLocator()
                .getS3Locations("/Users/ezequiel/Downloads/chk-3-checkpoint_metadata-f350e54becb2"));
    }

    public Set<String> getS3Locations(String manifestPath) throws IOException {
        Savepoint savepoint = SavepointStore.loadSavepoint(manifestPath, this.getClass().getClassLoader());

        Stream<String> rawStream = savepoint.getOperatorStates().stream()
                .flatMap(operatorState -> operatorState.getSubtaskStates().values().stream())
                .flatMap(operatorSubtaskState -> operatorSubtaskState.getRawKeyedState().stream())
                .map(keyedStateHandle -> (KeyGroupsStateHandle) keyedStateHandle)
                .map(KeyGroupsStateHandle::getDelegateStateHandle)
                .map(this::getPath);

        Stream<String> metadataStream = savepoint.getOperatorStates().stream()
                .flatMap(operatorState -> operatorState.getSubtaskStates().values().stream())
                .flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream())
                .map(keyedStateHandle -> (IncrementalKeyedStateHandle) keyedStateHandle)
                .map(IncrementalKeyedStateHandle::getMetaStateHandle)
                .map(this::getPath);

        return Stream.concat(rawStream, metadataStream).collect(Collectors.toSet());
    }

    private String getPath(StreamStateHandle streamStateHandle) {
        if (streamStateHandle instanceof FileStateHandle) {
            return ((FileStateHandle) streamStateHandle).getFilePath().toString();
        } else if (streamStateHandle instanceof ByteStreamStateHandle) {
            return ((ByteStreamStateHandle) streamStateHandle).getHandleName();
        }
        return null;
    }

}
...