Поведение Custom Partitioner в Apache Flink - PullRequest
0 голосов
/ 13 марта 2019

Я играю с разметкой Graph, используя библиотеку Gelly от Apache Flink (версия 1.7.2). Тем не менее, я наблюдаю поведение, которое я не до конца понимаю.

Вот код, который использует алгоритм PageRank, предоставленный в библиотеке Gelly.

    DataSet<Edge<Long, Double>> data = env.readTextFile(edgesInputPath).setParallelism(1).map(new MapFunction<String, Edge<Long, Double>>() {

        @Override
        public Edge<Long, Double> map(String s) {
            String[] fields = s.split("\\t");
            long src = Long.parseLong(fields[0]);
            long trg = Long.parseLong(fields[1]);
            return new Edge<>(src, trg, 1.0);
        }
    }).setParallelism(1);

    partitioner = new ImbalancedPartitioner<>(new CustomKeySelector(0));

    Graph<Long, Double, Double> graph = Graph.fromDataSet(data.partitionCustom(partitioner, new CustomKeySelector<>(0)),new InitVertices(1.0), env);

    DataSet<PageRank.Result<Long>> result = graph.run(new PageRank<Long, Double, Double>(0.85, maxIterations));

    result.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE);

Здесь ImbalancePartitioner реализует функцию partition (), которая всегда возвращает 0. Таким образом, все ребра графа будут в одном разделе. Общий уровень параллелизма установлен на 3.

Однако, когда я вижу график выполнения на информационной панели Flink и наблюдаю за записями, обработанными различными исполнителями / экземплярами операторов, я вижу, что за пределами точки все исполнители обрабатывают примерно одинаковое количество записей. Я ожидал, что только один исполнитель будет выполнять всю обработку, поскольку только один раздел должен иметь все ребра.

Второй и третий этапы имеют дисбаланс, который я ожидал увидеть, как показано здесь. Second stage

Third Stage Но начиная с четвертого этапа все экземпляры операторов обрабатывают примерно одинаковое количество записей. Например, вот статистика 4-го этапа. Fourth Stage

Хотя это может быть трудно увидеть, вот график выполнения Execution Graph

Что мне здесь не хватает? Если вы хотите проверить программу самостоятельно. Код доступен в этом репо . Я предоставил небольшой входной файл с краями, который меньше того, который я использую, но проблема все еще видна.

...