Не сериализуемый объект в Apache Flink - PullRequest
1 голос
/ 13 января 2020

Я использую Apache Flink для выполнения аналитики потоковых данных.

Я использую зависимость , объект которой для создания занимает более 10 секунд, поскольку он читает несколько существующих файлов. в hdfs до инициализации.

Если я инициализирую объект в открытом методе, я получаю исключение тайм-аута, а если в конструкторе приемника / плоской карты, я получаю исключение сериализации.

В настоящее время я использую stati c block для инициализации объекта в каком-либо другом классе, используя Preconditions.checkNotNull (MGenerator.mGenerator) в главном файле, а затем он работает, если используется в плоской карте приемника.

Есть ли способ создать объект не сериализуемой зависимости, который может занять более 10 секунд для инициализации в плоской карте или приемнике Флинка?

public class DependencyWrap {

  static MGenerator mGenerator;

  static {
    final String configStr = "{}";
    final Config config = new Gson().fromJson(config, Config.class);
    mGenerator = new MGenerator(config);
  }

}
public class MyStreaming {

  public static void main(String[] args) throws Exception {

    Preconditions.checkNotNull(MGenerator.mGenerator);
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(parallelism);
    ...
    input.flatMap(new RichFlatMapFunction<Map<String,Object>,List<String>>() {

      @Override
      public void open(Configuration parameters) {
      }

      @Override
      public void flatMap(Map<String,Object> value, Collector<List<String>> out) throws Exception {

        out.collect(MFVGenerator.mfvGenerator.generateMyResult(value.f0, value.f1));
      }

    });

  }
}

Также, пожалуйста, поправьте меня, если я ошибаюсь по этому вопросу.

Ответы [ 2 ]

1 голос
/ 14 января 2020

Использование open обычно является правильным местом для загрузки внешних источников поиска. Тайм-аут немного странный, возможно, вокруг него есть какая-то конфигурация.

Однако, если он огромен, использование загрузчика stati c (либо класса stati c, как вы, либо синглтона) имеет то преимущество, что вам нужно загрузить его только один раз для всех параллельных экземпляров задачи в одном диспетчере задач. Следовательно, вы экономите память и время процессора. Это особенно актуально для вас, поскольку вы используете одну и ту же структуру данных в двух отдельных задачах. Кроме того, загрузчик stati c может быть лениво инициализирован при первом использовании, чтобы избежать тайм-аута в open.

Очевидным недостатком этого подхода является то, что страдает тестируемость вашего кода. Есть несколько способов обойти это, которые я мог бы расширить, если есть интерес.

Я не вижу преимущества использования шаблона сериализатора прокси. Это излишне сложно (пользовательская сериализация в Java) и дает мало преимуществ.

1 голос
/ 14 января 2020

Выполнение этого в методе Open - это 100% правильный способ сделать это. Дает ли Flink исключение тайм-аута или объект?

В качестве последнего метода ditch, вы можете заключить объект в класс, который содержит как объект, так и его строку JSON или Config (сериализуемо ли для Config? ) с помеченным как временный объект, а затем переопределить методы ReadObject / WriteObject для вызова конструктора. Если сам объект mGenerator не имеет состояния (и у вас будут другие проблемы, если это не так), код сериализации должен вызываться только один раз, когда задания распределяются менеджерам задач.

...