Spark RDD рекурсивные операции на простом сборе - PullRequest
0 голосов
/ 29 апреля 2018

У меня есть информация о пользователях в СДР:

(Id:10, Name:bla, Adress:50, ...)

И у меня есть еще одна коллекция, содержащая последовательные изменения личности, которые мы собрали для каждого пользователя.

(lastId, newId)
    (10, 43)
    (85, 90)
    (43, 50)

Мне нужно получить последний идентификатор для идентификатора каждого пользователя, в этом примере:

getFinalIdentity(10) = 50     (10 -> 43 -> 50)

Некоторое время я использовал широковещательную переменную, содержащую эти тождества, и перебрал коллекцию, чтобы получить окончательный идентификатор. Все работало нормально, пока ссылка не стала слишком большой, чтобы поместиться в широковещательную переменную ...

Я придумал решение, использующее RDD для хранения идентификаторов и рекурсивно повторяющее его, но оно не очень быстрое и выглядит для меня очень сложным.

Есть ли элегантный и быстрый способ сделать это?

1 Ответ

0 голосов
/ 30 апреля 2018

Вы думали о графиках?

Вы можете создать графики из списка ребер как (lastId, newId). Таким образом, узлы без исходящих ребер являются окончательным идентификатором для узлов, у которых нет входящих ребер.

Это можно сделать в Spark с GraphX.

Ниже приведен пример. Для каждого идентификатора отображается идентификатор первого идентификатора в цепочке. Это означает, что для этого изменения идентификаторов (1 -> 2 -> 3) результат будет (1, 1), (2, 1), (3, 1)

import org.apache.spark.graphx.{EdgeDirection, EdgeTriplet, Graph, VertexId}
import org.apache.spark.{SparkConf, SparkContext}

object Main {

  val conf = new SparkConf().setAppName("myapp").setMaster("local[*]")
  val sc = new SparkContext(conf)

  def main(args: Array[String]): Unit = {

    sc.setLogLevel("ERROR")

    // RDD of pairs (oldId, newId)
    val changedIds = sc.parallelize(Seq((1L, 2L), (2L, 3L), (3L, 4L), (10L, 20L), (20L, 31L), (30L, 40L), (100L, 200L), (200L, 300L)))

    // case classes for pregel operation
    case class Value(originId: VertexId)      // vertex value
    case class Message(value: VertexId)       // message sent from one vertex to another

    // Create graph from id pairs
    val graph = Graph.fromEdgeTuples(changedIds, Value(0))

    // Initial message will be sent to all vertexes at the start
    val initialMsg = Message(0)

    // How vertex should process received message
    def onMsgReceive(vertexId: VertexId, value: Value, msg: Message): Value = {
      // Initial message will have value 0. In that case current vertex need to initialize its value to its own ID
      if (msg.value == 0) Value(vertexId)
      // Otherwise received value is initial ID
      else Value(msg.value)
    }

    // How vertexes should send messages
    def sendMsg(triplet: EdgeTriplet[Value, Int]): Iterator[(VertexId, Message)] = {
      // For the triplet only single message shall be sent to destination vertex
      // Its payload is source vertex origin ID
      Iterator((triplet.dstId, Message(triplet.srcAttr.originId)))
    }

    // How incoming messages to one vertex should be merged
    def mergeMsg(msg1: Message, msg2: Message): Message = {
      // Generally for this case it's an error
      // Because one ID can't have 2 different originIDs
      msg2    // Just return any of the incoming messages
    }

    // Kick out pregel calculation
    val res = graph
      .pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(onMsgReceive, sendMsg, mergeMsg)

    // Print results
    res.vertices.collect().foreach(println)
  }
}

Вывод: (finalId firstId)

(100,Value(100))
(4,Value(1))
(300,Value(100))
(200,Value(100))
(40,Value(30))
(20,Value(10))
(1,Value(1))
(30,Value(30))
(10,Value(10))
(2,Value(1))
(3,Value(1))
(31,Value(10))
...