В чем разница между изменением входных аргументов и созданием нового объекта в Vprog из spark graphx - PullRequest
0 голосов
/ 23 апреля 2020

есть моя программа:

static class Vprog extends AbstractFunction3< Object, OddRange, OddRange, OddRange> implements Serializable {
    @Override
    public OddRange apply(Object l, OddRange self, OddRange sumOdd) {
        System.out.println(self.getS()+self.getI()+" ---> "+sumOdd.getS()+sumOdd.getI());
        self.setS(sumOdd.getS() + self.getS());
        self.setI(self.getI() + sumOdd.getI());
        return new OddRange(self.getS(), self.getI());
     }
}

вопрос если я использую return new OddRange, как указано выше в классе Vprog , Я могу изменить vertexRDD

Но, если я использую retuen self , например:

static class Vprog extends AbstractFunction3< Object, OddRange, OddRange, OddRange> implements Serializable {
    @Override
    public OddRange apply(Object l, OddRange self, OddRange sumOdd) {
        System.out.println(self.getS()+self.getI()+" ---> "+sumOdd.getS()+sumOdd.getI());
        self.setS(sumOdd.getS() + self.getS());
        self.setI(self.getI() + sumOdd.getI());
        return self;
    }
}

vertexRDD не изменился. Я знаю, что RDD неизменен, но как я могу обновить vectexRDD в spark.graphx.pregel правильно? Можете ли вы дать мне какой-либо совет?

Я нашел тот же вопрос: Spark Pregel не работает с Java Но я использую spark 2.3.0 , может быть, у него такая же проблема?

1 Ответ

0 голосов
/ 26 апреля 2020

Я думаю, что нашел ответ: Мы должны вернуть новый, если мы хотим изменить данные, которые будут использоваться в следующем sendMsg в Vprog.
, потому что Vprog изменяет vertexRDD, но sendMsg использует tripletsRDD. Более того, вершины в tripletsRDD не равны vertexRDD, это просто копия vertexRDD. Итак, проблема в том, когда обновлять verteies в tripletsRDD при изменении vertexRDD.

Мы можем проследить источник ниже, чтобы выяснить причину:
первая часть: pregel (в Pregel. scala) -> joinVertices (в GraphOps.scala) -> outerJoinVertices (в GraphImpl. scala) -> diff (в VertexRddImpl. scala)
А затем:
вторая часть : pregel (в Pregel. scala) -> mapReduceTriplets (в GraphXUtils. scala) -> aggregateMessagesWithActiveSet (в GraphImpl. scala).

В первой части я обнаружил, что Vprog будет сравнивать данные VertexRDD до и после выполнения. ТАК, если он будет изменен на исходных данных, они будут одинаковыми. Затем будет сгенерирована структура данных с именем replicatedVertexView для хранения различной информации VertexRDD. Если они одинаковые, ничего не будет сохранено.
Во второй части будет обновлен tripletsRDD с информацией, хранящейся в relicatedVertexView. А затем используйте tripletsRDD в sendMsg.
Итак, если мы не вернем new в Vprog, tripletsRDD не будет изменен с VertexRDD, и результаты будут неправильными.

...