Использование широковещательной переменной ИЛИ использование фильтра RDD для вычисления пересечения соседей двух узлов? - PullRequest
0 голосов
/ 03 марта 2020

Я использовал GraphLoader для загрузки моего графика в СДР. у каждого узла в графе есть несколько соседей. главная цель - найти их пересечение и выполнить над ними параллельные и распределенные операции. каждый узел сначала имеет атрибут 1, и я изменил свой атрибут на (метка, Isimportant), используя следующий код:

case class nodes_properties(label:Int, ISimportant:Boolean=false)
var work_graph=graph.mapVertices{case(node,property)=> nodes_properties(node.toInt,false)}

каждый раз, когда любой узел обновляет свою метку, рабочий_граф будет обновляться.

Я использовал 2 метода для нахождения общих соседей (пересечение двух соседних узлов) двух узлов. Я должен упомянуть, что я буду выполнять их на кластере не локально.

neighbors(1)=[2 3 6 9]
neighbors(2)=[1 3 5 9]

intersection(1,2)=(3 9)

первый метод:

val all_neighbors: VertexRDD[Array[VertexId]] = graph.collectNeighborIds(EdgeDirection.Either).cache()
val broadcastVar = all_neighbors.collect().toMap
val nvalues = sc.broadcast(broadcastVar)

val common_neighbors=nvalues.value(1).intersect(nvalues.value(2))

common_neighbors.foreach{ 

work_graph=work_graph.mapVertices((vid:VertexId,v:nodes_properties)=> {
 x=>

 if(vid==x) nodes_properties(core_node_label)
 else v })
}

Второй метод:

val all_neighbors: VertexRDD[Array[VertexId]] = graph.collectNeighborIds(EdgeDirection.Either).cache()
val common_neighbors2=(all_neighbors.filter(x=>x._1==1)).intersection(all_neighbors.filter(x=>x._1==2))
common_neighbors2.foreach {

work_graph=work_graph.mapVertices((vid:VertexId,v:nodes_properties)=> {
 x=>

 if(vid==x) nodes_properties(core_node_label)
 else v })
}
}

Вопрос:

у меня такой вопрос: какой из перечисленных методов работает параллельно и распределенным образом ??? . я имею в виду, если я использую метод 1 и широковещательную переменную для вычисления общих соседей, будет ли метод foreach для выполнения некоторых операций выполняться на всех ведомых и распределенных устройствах или если я буду использовать метод 2 и использовать фильтр для вычисления общих соседей, а затем выполнение foreach будет запустить в распределенном?

1 Ответ

0 голосов
/ 03 марта 2020

Насколько я вижу, ни один из методов не будет работать параллельно

Первый метод, он будет выполняться в месте расположения графиков, но последовательно, потому что вы вызываете foreach для * 1026. * Map

Второй метод, я думаю, он должен выполнять фильтрацию параллельно (извините, я не работал с GraphX), и тогда он перейдет в последовательное foreach выполнение

Также на Spark's GraphX ​​do c page отмечается, что

Обратите внимание, что collectNeighborIds и collectNeighbors операторы могут быть весьма дорогостоящими поскольку они дублируют информацию и требуют существенного общения. Если возможно, попробуйте выразить то же самое вычисление напрямую, используя оператор mapReduceTriplets .

Я бы предложил перестроить ваш код в соответствии с рекомендациями, приведенными выше, уменьшение карты (я имею в виду mapReduceTriplets) будет выполняться параллельно и распределенным образом наверняка

...