ОШИБКА TransportResponseHandler: по-прежнему ожидают 3 запроса при соединении с узла - PullRequest
0 голосов
/ 02 апреля 2019

Я хочу выполнить некоторые операции, используя RDD.map, и он работает в искровой нити. Тем не менее, он добавляет ошибки при добавлении к нему цикла for. Я хочу знать почему и как это исправить .

Когда я добавляю for (walkCount...), spark выдает следующую ошибку:

java.io.FileNotFoundException: /home/xxx/usr/hadoop-2.7.3/tmp/nm-local-dir/usercache/xxx/appcache/application_1554174196597_0019/blockmgr-ac0eb809-641a-437a-a2f0-223084771848/1f/temp_shuffle_303f490a-6e1b-46a2-ae98 -3e3460218bbf (Слишком много открытых файлов)

...

19/04/02 19:41:53 ОШИБКА TransportResponseHandler: Все еще есть 3 невыполненные запросы при закрытии соединения с узла 6 / ip: 40762 19/04/02 19:41:53 ИНФОРМАЦИЯ RetryBlockFetcher: повторная выборка (1/3) для 1 выдающийся блок аф ...

Код ниже. Работает без for (walkCount...).

def randomWalk(): RDD[Array[Long]]=  {//get a sequence of nodes(Long type) from a multilayer graph    
  var randomWalk = initialWalk.map { case (nodeId, clickNode) =>
    ...
    (nodeId, pathBuffer, layer)//nodeId:Long;pathBuffer:ArrayBuffer[Long];layer:Int
  }.persist(persistLevel)//this part is no problem

  for (walkCount <- 0 until 60) {//without this for loop, it works

    randomWalk  = randomWalk.map { case (nodeId, pathBuffer, layer) =>
      val prevNodeId = pathBuffer(pathBuffer.length - 2)//the last two node
      val currentNodeId = pathBuffer.last//the last node
      (s"$prevNodeId $currentNodeId", (nodeId, pathBuffer, layer))
    }.join(indexedEdges).map { case (edge, ((nodeId, pathBuffer, currentLayer), dstNeighbors)) =>//indexedEdges is RDD[(s"$prevNodeId $currentNodeId", dstNeighbors(currentNodeId's neighbors))]
      try {
        //dstNeighbors:Array[(neighborId:Long, layer:Int, weight:Double, tal:Double)]
        val lastNode = pathBuffer.last
        val nextNode = Graphops.produceNode(dstNeighbors, currentLayer, lastNode)//Array[(nextNodeId:Long, newLayer:Int)], size is 1; choose next node, also consider whether changes layer, this function involves math.random and a constant, Graphops.q
        require(nextNode.length == 1, "nextNode.length != 1") 
        pathBuffer.append(nextNode(0)._1)
        (nodeId, pathBuffer, nextNode(0)._2)
      } catch {
        case e: Exception => throw new RuntimeException(e.getMessage)
      }
    }.persist(persistLevel)
  }//this correspond to for loop
  randomWalk.map(_._2.toArray)
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...