Apache Flink: В чем разница между setParallelism () и setMaxParallelism () - PullRequest
0 голосов
/ 06 февраля 2019

Я попытался установить максимальный параллелизм для задания Flink, используя метод ExecutionConfig.setMaxParallelism(), но, похоже, он не работал.

Я также изменил стандартный пример WordCount, чтобы выполнить несколько тестов, ипохоже, что метод setMaxParallelism() не влияет ни на локальную среду, ни на отдельный кластер.

Как работает setMaxParallelism()?

Ответы [ 2 ]

0 голосов
/ 08 февраля 2019

Сегодня я провел еще несколько тестов, используя поток вместо набора данных.На этот раз я увидел эффект setMaxParallelism ().

    public static void main(String[] args) throws Exception
    {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setMaxParallelism(4); // <-- effect

        DataStream<String> text = env.fromElements(WORDS);

        DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);

        counts.writeAsCsv("test.dat");

        env.execute("WordCount Example");
    }

Интересная ошибка, которую увидел клиент,

Caused by: org.apache.flink.runtime.JobException: Vertex Flat Map's parallelism (8) is higher than the max parallelism (4). Please lower the parallelism or increase the max parallelism.
        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:188)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:830)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
        at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1152)
        at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1132)
        at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
        ... 10 more

Спасибо

0 голосов
/ 08 февраля 2019

Flink предоставляет две настройки:

  • setParallelism(x) устанавливает параллелизм задания или оператора на x, т. Е. Количество параллельных задач для операторов.
  • setMaxParallelism(y) контролирует максимальное количество задач, на которые может быть распределено ключевое состояние, т. Е. Максимально эффективный параллелизм оператора.У оператора все еще может быть больше задач, но только y из них будет иметь назначенное состояние ключа и могут быть использованы для обработки.Единица распределения состояния ключа называется группами ключей.

Документация объясняет концепции более подробно.

...