Как добавить агрегированные данные в исходный набор данных в Apache Spark? - PullRequest
0 голосов
/ 25 мая 2018

Я пытаюсь выяснить, как объединить данные из набора данных, а затем добавить результат в исходный набор данных с помощью Apache Spark.Я испробовал 2 решения, которые меня не устраивают, и мне интересно, есть ли более масштабируемое и эффективное решение, которого я не вижу.

Вот очень упрощенные примеры входных и ожидаемых выходных данных:

Ввод :

Список клиентов, а для каждого клиента - список приобретенных товаров.

(John, [toast, butter])
(Jane, [toast, jelly])

Вывод :

Список клиентов, для каждого клиента - список приобретенных товаров, а для каждого товара - количество клиентов, купивших этот товар.

(John, [(toast, 2), (butter, 1)])
(Jane, [(toast, 2), (jelly, 1)])

Вот решения, которые я пробовал до сих пор, перечисляя шаги и выходные данные.

Решение № 1:

Start with a pair rdd:
(John, [toast, butter])
(Jane, [toast, jelly])

flatMapToPair:
(toast, John)
(butter, John)
(toast, Jane)
(jelly, Jane)

aggregateByKey: 
(toast, [John, Jane])
(butter, [John])
(jelly, [Jane])

flatMapToPair: (using the size of the list of customers)
(John, [(toast, 2), (butter, 1)])
(Jane, [(toast, 2), (jelly, 1)])

Хотя это работает для небольшого набора данных,Это ужасная идея с более крупной, потому что в какой-то момент вы держите для каждого продукта огромный список клиентов, которые, вероятно, не поместятся в памяти исполнителя.

Решение № 2:

Start with a pair rdd:
(John, [toast, butter])
(Jane, [toast, jelly])

flatMapToPair:
(toast, John)
(butter, John)
(toast, Jane)
(jelly, Jane)

aggregateByKey: (counting customers without creating a list)
(toast, 2)
(butter, 1)
(jelly, 1)

join: (using the two previous results)
(toast, (John, 2))
(butter, (John, 1))
(toast, (Jane, 2))
(jelly, (Jane, 1))

mapToPair:
(John, (toast, 2))
(John, (butter, 1))
(Jane, (toast, 2))
(Jane, (jelly, 1))

aggregateByKey:
(John, [(toast, 2), (butter, 1)])
(Jane, [(toast, 2), (jelly, 1)])

Это решение должно работать, но я чувствую, что должно быть что-то другоеРешение, которое может не включать присоединение СДР.

Существует ли более масштабируемое / эффективное / лучшее "Решение № 3" для этой проблемы?

Ответы [ 2 ]

0 голосов
/ 25 мая 2018

Вот способ dataframe для вас, чтобы попытаться поиграть с

Если у вас уже есть парное rdds , то вызов toDF с именами столбцов должен дать вам dataframe as

val df = pairedRDD.toDF("key", "value")

, который должен быть

+----+---------------+
|key |value          |
+----+---------------+
|John|[toast, butter]|
|Jane|[toast, jelly] |
+----+---------------+

Теперь все, что вам нужно сделать, это explode, groupby, агрегация для подсчетов и сноваexplode, groupby и агрегация для получения исходного набора данных со значениями как

import org.apache.spark.sql.functions._
df.withColumn("value", explode(col("value")))
  .groupBy("value").agg(count("value").as("count"), collect_list("key").as("key"))
  .withColumn("key", explode(col("key")))
  .groupBy("key").agg(collect_list(struct("value", "count")).as("value"))

, что должно дать вам

+----+-----------------------+
|key |value                  |
+----+-----------------------+
|John|[[toast,2], [butter,1]]|
|Jane|[[jelly,1], [toast,2]] |
+----+-----------------------+

Вы можете обработать дальше вdataframe или вернитесь к rdd с помощью .rdd api.

0 голосов
/ 25 мая 2018

Я думаю, что другим подходом было бы использование GraphX.

Вот рабочий код (scala 2.11.12, Spark 2.3.0):

import org.apache.spark.graphx._
import org.apache.spark.sql.SparkSession

object Main {

  private val ss = SparkSession.builder().appName("").master("local[*]").getOrCreate()
  private val sc = ss.sparkContext

  def main(args: Array[String]): Unit = {

    sc.setLogLevel("ERROR")

    // Class for vertex values
    case class Value(name: String, names: List[String], count: Int)
    // Message that is sent from one Vertex to another
    case class Message(names: List[String], count: Int)

    // Simulate input data
    val allData = sc.parallelize(Seq(
      ("John", Seq("toast", "butter")),
      ("Jane", Seq("toast", "jelly"))
    ))

    // Create vertices
    // Goods and People names - all will become vertices
    val vertices = allData.flatMap(pair =>
      pair._2 // Take all goods bought
        .union(Seq(pair._1)) // add name
        .map(v => (v.hashCode.toLong, Value(v, List[String](), 0)))) // (id, Value)

    // Hash codes are required because in GraphX in vertexes requires IDs as Long
    // Create edges: Person --> Bought goods
    val edges = allData
      .flatMap(pair =>
        pair._2 // Take all goods
          .map(goods => Edge[Int](pair._1.hashCode().toLong, goods.hashCode.toLong, 0))) // create pairs of (person, bought_good)

    // Create graph from edges and vertices
    val graph = Graph(vertices, edges)

    // Initial message will be sent to all vertexes at the start
    val initialMsg = Message(List[String](), 0)

    // How vertex should process received message
    def onMsgReceive(vertexId: VertexId, value: Value, msg: Message): Value = {
      if (msg == initialMsg) value // Just ignore initial message
      else Value(value.name, msg.names, msg.count) // Received message already contains all our results
    }

    // How vertexes should send messages
    def sendMsg(triplet: EdgeTriplet[Value, Int]): Iterator[(VertexId, Message)] = {
      // Each vertix sends only one message with it's own name and 1
      Iterator((triplet.dstId, Message(List[String](triplet.srcAttr.name), 1)))
    }

    // How incoming messages to one vertex should be merged
    def mergeMsg(msg1: Message, msg2: Message): Message = {
      // On the goods vertices messages from people who bought them will merge
      // Final message will contain names of all people who bought this good and count of them
      Message(msg1.names ::: msg2.names, msg1.count + msg2.count)
    }

    // Kick out pregel calculation
    val res = graph
      .pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(onMsgReceive, sendMsg, mergeMsg)

    val values = res.vertices
      .filter(v => v._2.count != 0)   // Filter out people - they will not have any incoming edges
      .map(pair => pair._2)           // Also remove IDs

    values      // (good, (List of names, count))
      .flatMap(v => v.names.map(n => (n, (v.name, v.count))))     // transform to (name, (good, count))
      .aggregateByKey(List[(String, Int)]())((acc, v) => v :: acc, (acc1, acc2) => acc1 ::: acc2)   // aggregate by names
      .collect().foreach(println)     // Print the result
  }
}

Возможно, есть лучший способ, каксделать это с тем же подходом, но все же - результат:

=======================================
(Jane,List((jelly,1), (toast,2)))
(John,List((butter,1), (toast,2)))

ОБНОВЛЕНИЕ

Этот второй пример - то, о чем я говорил в комментариях.

import org.apache.spark.graphx._
import org.apache.spark.sql.SparkSession

object Main {

  private val ss = SparkSession.builder().appName("").master("local[*]").getOrCreate()
  private val sc = ss.sparkContext

  def main(args: Array[String]): Unit = {

    sc.setLogLevel("ERROR")

    // Entity and how much it was bought
    case class Entity(name: String, bought: Int)
    // Class for vertex values
    case class Value(name: Entity, names: List[Entity])
    // Message that is sent from one Vertex to another
    case class Message(items: List[Entity])
    // Simulate input data
    val allData = sc.parallelize(Seq(
      ("John", Seq("toast", "butter")),
      ("Jane", Seq("toast", "jelly"))
    ))

    // First calculate how much of each Entity was bought
    val counts = allData
      .flatMap(pair => pair._2.map(v => (v, 1))) // flatten all bought items
      .reduceByKey(_ + _) // count occurrences
      .map(v => Entity(v._1, v._2)) // create items

    // Create vertices
    // Goods and People names - all will become vertices
    val vertices = allData
      .map(pair => Entity(pair._1, 0))    // People are also Entities - but with 0, since they were not bought :)
      .union(counts)                      //
      .map(v => (v.name.hashCode.toLong, Value(Entity(v.name, v.bought), List[Entity]())))      // (key, value)

    // Hash codes are required because in GraphX in vertexes requires IDs as Long
    // Create edges: Entity --> Person
    val edges = allData
      .flatMap(pair =>
        pair._2 // Take all goods
          .map(goods => Edge[Int](goods.hashCode.toLong, pair._1.hashCode().toLong, 0)))

    // Create graph from edges and vertices
    val graph = Graph(vertices, edges)

    // Initial message will be sent to all vertexes at the start
    val initialMsg = Message(List[Entity](Entity("", 0)))

    // How vertex should process received message
    def onMsgReceive(vertexId: VertexId, value: Value, msg: Message): Value = {
      if (msg == initialMsg) value // Just ignore initial message
      else Value(value.name, msg.items) // Received message already contains all results
    }

    // How vertexes should send messages
    def sendMsg(triplet: EdgeTriplet[Value, Int]): Iterator[(VertexId, Message)] = {
      // Each vertex sends only one message with it's own Entity
      Iterator((triplet.dstId, Message(List[Entity](triplet.srcAttr.name))))
    }

    // How incoming messages to one vertex should be merged
    def mergeMsg(msg1: Message, msg2: Message): Message = {
      // On the goods vertices messages from people who bought them will merge
      // Final message will contain names of all people who bought this good and count of them
      Message(msg1.items ::: msg2.items)
    }

    // Kick out pregel calculation
    val res = graph
      .pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(onMsgReceive, sendMsg, mergeMsg)


    res
      .vertices
      .filter(vertex => vertex._2.names.nonEmpty)             // Filter persons
      .map(vertex => (vertex._2.name.name, vertex._2.names))  // Remove vertex IDs
      .collect()      // Print results
      .foreach(println)
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...