Получение исключения при проверке точки сохранения flink с использованием API состояния процессора - PullRequest
1 голос
/ 20 января 2020

Я получаю исключение в потоке "main" java .lang.IllegalAccessError: класс org. apache .flink.state.api.runtime.SavepointLoader пытался получить доступ к защищенному методу org. apache .flink. runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer (Ljava / языки / String;) Lorg / Apache / Flink / среда / состояние / CompletedCheckpointStorageLocation; (org. apache .flink.state.api.runtime.SavepointLoader и org. apache .flink.runtime.state.filesystem.AbstractFsCheckpointStorage находятся в безымянном модуле загрузчика 'app')

Использование flink 1,8. Использование нижеследующего репозитория maven:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-state-processor-api_2.12</artifactId>
      <version>1.9.1</version>
    </dependency>

Фрагмент исходного кода

        ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load(bEnv, "/home/utlesh/Documents/savepoint", new MemoryStateBackend()) ;
        savepoint.readListState("input-events-source-01", "Custom Source", TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>(){}));

Получение исключения во второй строке, вызывающей функцию ниже

    public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
    org.apache.flink.runtime.checkpoint.savepoint.Savepoint savepoint = SavepointLoader.loadSavepoint(path);
    ...
    ...
}

Которая вызывает функцию ниже:

    package org.apache.flink.state.api.runtime;

    public static Savepoint loadSavepoint(String savepointPath) throws IOException {
        CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorage
            .resolveCheckpointPointer(savepointPath);

        try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) {
            return Checkpoints.loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader());
        }
    }

, которая вызывает следующую функцию:

    package org.apache.flink.runtime.state.filesystem;

    protected static CompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
        checkNotNull(checkpointPointer, "checkpointPointer");
        checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
       ...
       ...
}

Если мы внимательно посмотрим, здесь вызывается защищенная функция другого пакета. Это ошибка в репозитории Flink Maven или я неправильно его использую? Есть ли другой способ десериализации или чтения точки сохранения и контрольной точки flink?

Ответы [ 2 ]

1 голос
/ 21 января 2020

Похоже, что несоответствие версий зависимостей для вашего flink.

Добавьте следующие зависимости в pom. xml и соберите заново, также удалите старую версию зависимости flink-клиенты из одного файла.

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>1.9.1</version>
</dependency>
0 голосов
/ 21 января 2020

API State Processor API можно использовать только в пакетных заданиях, работающих под управлением Flink 1.9 или более поздней версии, но его можно использовать для считывания точек сохранения и контрольных точек, которые были записаны потоковыми заданиями под управлением более старых версий Flink (обратно к Flink 1.6).

...