Зачем спарк делить все данные в одном исполнителе? - PullRequest
1 голос
/ 25 марта 2019

Я работаю с Spark GraphX. Я строю график из файла (около 620 МБ, 50К вершин и почти 50 миллионов ребер). Я использую искровой кластер с: 4 рабочими, каждый с 8 ядрами и 13,4 г оперативной памяти, 1 драйвер с теми же характеристиками. Когда я отправляю свой .jar в кластер, случайно один из рабочих загружает все данные на него. Все задачи, необходимые для вычислений, запрашиваются этим работником. В то время как вычисления оставшиеся три бездействуют. Я попробовал все, и я не нашел ничего, что могло бы заставить вычислять всех работников.

Когда Spark строит график, и я ищу количество разделов RDD вершин, скажем, 5, но если я перераспределю этот RDD, например, с 32 (общее количество ядер), Spark загрузит данные в каждого работника, но получит замедлить вычисления.

Я запускаю искру, отправленную таким образом:

spark-submit --master spark://172.30.200.20:7077 --driver-memory 12g --executor-memory 12g --class interscore.InterScore /root/interscore/interscore.jar hdfs://172.30.200.20:9000/user/hadoop/interscore/network.dat hdfs://172.30.200.20:9000/user/hadoop/interscore/community.dat 111

Код здесь:

object InterScore extends App{
  val sparkConf = new SparkConf().setAppName("Big-InterScore")
  val sc = new SparkContext(sparkConf)

  val t0 = System.currentTimeMillis
  runInterScore(args(0), args(1), args(2))
  println("Running time " + (System.currentTimeMillis - t0).toDouble / 1000)

  sc.stop()

  def runInterScore(netPath:String, communitiesPath:String, outputPath:String) = {
    val communities = sc.textFile(communitiesPath).map(x => {
      val a = x.split('\t')
      (a(0).toLong, a(1).toInt)
    }).cache

    val graph = GraphLoader.edgeListFile(sc, netPath, true)
      .partitionBy(PartitionStrategy.RandomVertexCut)
      .groupEdges(_ + _)
      .joinVertices(communities)((_, _, c) => c)
      .cache

    val lvalues = graph.aggregateMessages[Double](
      m => {
          m.sendToDst(if (m.srcAttr != m.dstAttr) 1 else 0)
          m.sendToSrc(if (m.srcAttr != m.dstAttr) 1 else 0)
      }, _ + _)

    val communitiesIndices = communities.map(x => x._2).distinct.collect
    val verticesWithLValue = graph.vertices.repartition(32).join(lvalues).cache
    println("K = " + communitiesIndices.size)
    graph.unpersist()
    graph.vertices.unpersist()
    communitiesIndices.foreach(c => {
    //COMPUTE c
      }
    })
  }
}
...