Вы думали о графиках?
Вы можете создать графики из списка ребер как (lastId, newId)
. Таким образом, узлы без исходящих ребер являются окончательным идентификатором для узлов, у которых нет входящих ребер.
Это можно сделать в Spark с GraphX.
Ниже приведен пример. Для каждого идентификатора отображается идентификатор первого идентификатора в цепочке. Это означает, что для этого изменения идентификаторов (1 -> 2 -> 3)
результат будет (1, 1), (2, 1), (3, 1)
import org.apache.spark.graphx.{EdgeDirection, EdgeTriplet, Graph, VertexId}
import org.apache.spark.{SparkConf, SparkContext}
object Main {
val conf = new SparkConf().setAppName("myapp").setMaster("local[*]")
val sc = new SparkContext(conf)
def main(args: Array[String]): Unit = {
sc.setLogLevel("ERROR")
// RDD of pairs (oldId, newId)
val changedIds = sc.parallelize(Seq((1L, 2L), (2L, 3L), (3L, 4L), (10L, 20L), (20L, 31L), (30L, 40L), (100L, 200L), (200L, 300L)))
// case classes for pregel operation
case class Value(originId: VertexId) // vertex value
case class Message(value: VertexId) // message sent from one vertex to another
// Create graph from id pairs
val graph = Graph.fromEdgeTuples(changedIds, Value(0))
// Initial message will be sent to all vertexes at the start
val initialMsg = Message(0)
// How vertex should process received message
def onMsgReceive(vertexId: VertexId, value: Value, msg: Message): Value = {
// Initial message will have value 0. In that case current vertex need to initialize its value to its own ID
if (msg.value == 0) Value(vertexId)
// Otherwise received value is initial ID
else Value(msg.value)
}
// How vertexes should send messages
def sendMsg(triplet: EdgeTriplet[Value, Int]): Iterator[(VertexId, Message)] = {
// For the triplet only single message shall be sent to destination vertex
// Its payload is source vertex origin ID
Iterator((triplet.dstId, Message(triplet.srcAttr.originId)))
}
// How incoming messages to one vertex should be merged
def mergeMsg(msg1: Message, msg2: Message): Message = {
// Generally for this case it's an error
// Because one ID can't have 2 different originIDs
msg2 // Just return any of the incoming messages
}
// Kick out pregel calculation
val res = graph
.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(onMsgReceive, sendMsg, mergeMsg)
// Print results
res.vertices.collect().foreach(println)
}
}
Вывод: (finalId firstId)
(100,Value(100))
(4,Value(1))
(300,Value(100))
(200,Value(100))
(40,Value(30))
(20,Value(10))
(1,Value(1))
(30,Value(30))
(10,Value(10))
(2,Value(1))
(3,Value(1))
(31,Value(10))