Искра Нет места на устройстве при работе с очень большими данными - PullRequest
0 голосов
/ 04 июня 2018

Ниже приведен мой код искры scala:

val vertex = graph.vertices
val edges = graph.edges.map(v=>(v.srcId, v.dstId)).toDF("key","value")
var FMvertex = vertex.map(v => (v._1, HLLCounter.encode(v._1)))
var encodedVertex = FMvertex.toDF("keyR", "valueR")

var Degvertex = vertex.map(v => (v._1, 0.toLong))
var lastRes = Degvertex
//calculate FM of the next step
breakable {
  for (i <- 1 to MaxIter) {
    var N_pre = FMvertex.map(v => (v._1, HLLCounter.decode(v._2)))
    var adjacency = edges.join(
      encodedVertex,//FMvertex.toDF("keyR", "valueR"),
      $"value" === $"keyR"
    ).rdd.map(r => (r.getAs[VertexId]("key"), r.getAs[Array[Byte]]("valueR"))).reduceByKey((a,b)=>HLLCounter.Union(a,b))
    FMvertex = FMvertex.union(adjacency).reduceByKey((a,b)=>HLLCounter.Union(a,b))

    // update vetex encode
    encodedVertex = FMvertex.toDF("keyR", "valueR")

    var N_curr = FMvertex.map(v => (v._1, HLLCounter.decode(v._2)))
    lastRes = N_curr
    var middleAns = N_curr.union(N_pre).reduceByKey((a,b)=>Math.abs(a-b))//.mapValues(x => x._1 - x._2)
    if (middleAns.values.sum() == 0){
      println(i)
      break
    }
    Degvertex = Degvertex.join(middleAns).mapValues(x => x._1 + i * x._2)//.map(identity)
  }
}
val res = Degvertex.join(lastRes).mapValues(x => x._1.toDouble / x._2.toDouble)
return res

, в котором я использую несколько функций, определенных в Java:

    import net.agkn.hll.HLL;
import com.google.common.hash.*;
import com.google.common.hash.Hashing;

import java.io.Serializable;

public class HLLCounter implements Serializable {
    private static int seed = 1234567;
    private static HashFunction hs = Hashing.murmur3_128(seed);

    private static int log2m = 15;
    private static int regwidth = 5;


    public static byte[] encode(Long id) {
        HLL hll = new HLL(log2m, regwidth);
        Hasher myhash = hs.newHasher();
        hll.addRaw(myhash.putLong(id).hash().asLong());
        return hll.toBytes();
    }

    public static byte[] Union(byte[] byteA, byte[] byteB) {
        HLL hllA = HLL.fromBytes(byteA);
        HLL hllB = HLL.fromBytes(byteB);
        hllA.union(hllB);
        return hllA.toBytes();
    }

    public static long decode(byte[] bytes) {
        HLL hll = HLL.fromBytes(bytes);
        return hll.cardinality();
    }
}

Этот код используется для расчета эффективной близости набольшой график, и я использовал пакет Hyperloglog.

Код работает нормально, когда я запустил его на графе с примерно десятью миллионами вершин и сотнями миллионов ребер.Однако, когда я запустил его на графике с тысячами миллионов графиков и миллиардами ребер, после нескольких часов работы на кластерах он показывает

Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 91 in stage 29.1 failed 4 times, most recent failure: Lost task 91.3 in stage 29.1 (TID 17065, 9.10.135.216, executor 102): java.io.IOException: : No space left on device
 at java.io.FileOutputStream.writeBytes(Native Method)
 at java.io.FileOutputStream.write(FileOutputStream.java:326)
 at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
 at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)

Кто-нибудь может мне помочь?Я просто начал использовать искру в течение нескольких дней.Спасибо за помощь.

1 Ответ

0 голосов
/ 06 июня 2018

Сяотянь, вы утверждаете, что "чтение в случайном порядке и запись в случайном порядке составляет около 1 ТБ. Мне не нужны эти промежуточные значения или СДР".Это утверждение подтверждает, что вы не знакомы с Apache Spark или, возможно, с алгоритмом, который вы используете.Пожалуйста, позвольте мне объяснить.

При добавлении трех чисел, вы должны сделать выбор первых двух чисел, чтобы добавить.Например (a + b) + c или a + (b + c).После того, как этот выбор сделан, в скобках содержится временное промежуточное значение.Невозможно продолжить вычисление для всех трех чисел без промежуточного числа.

СДР - это структура данных с эффективным использованием пространства.Каждый «новый» RDD представляет собой набор операций для всего набора данных.Некоторые СДР представляют собой одну операцию, например «добавить пять», в то время как другие представляют цепочку операций, например «добавить пять, затем умножить на шесть и вычесть на семь».Вы не можете отказаться от СДР, не отказавшись от какой-либо части вашего математического алгоритма.

По своей сути Apache Spark является алгоритмом рассеяния.Он распределяет набор данных по ряду рабочих узлов, где этот набор данных является частью единого RDD, который распространяется вместе с необходимыми вычислениями.На данный момент вычисления еще не выполнены.Поскольку данные запрашиваются из вычисленной формы СДР, вычисления выполняются по требованию.

Иногда невозможно завершить вычисления на одном работнике, не зная некоторых промежуточных значений из другихработников.Этот вид перекрестной связи между работниками всегда происходит между головным узлом, который распределяет данные среди разных работников и собирает и объединяет данные от разных работников;но, в зависимости от того, как устроен алгоритм, он также может происходить в середине вычислений (особенно в алгоритмах, которые groupBy или объединяют срезы данных).

У вас есть алгоритм, который требует перестановки, таким образом, что одинузел не может собрать результаты от всех других узлов, потому что у одного узла недостаточно оперативной памяти для хранения промежуточных значений, собранных из других узлов.

Короче говоря, у вас есть алгоритм, который не может масштабироваться в соответствии с размером вашего набора данных с имеющимся у вас оборудованием.

На этом этапе вам необходимо вернуться кАлгоритм Apache Spark и посмотрите, можно ли

  1. Настроить разделы в RDD, чтобы уменьшить перекрестные помехи (слишком малые разделы могут потребовать большего количества перекрестных разговоров при перетасовке в качестве полностью подключенной взаимной передачиувеличивается в O (N ^ 2), слишком большие разделы могут исчерпать оперативную память в вычислительном узле).
  2. Перестройте алгоритм так, чтобы полная перестановка не требовалась (иногда вы можете сократить поэтапно, так чтоВы имеете дело с большим количеством фаз сокращения, причем каждая фаза объединяет меньше данных.
  3. Перестройте алгоритм так, чтобы не требовалось перемешивание (это возможно, но маловероятно, что алгоритм просто неправильно написан, и с учетом егоиначе можно избежать запроса удаленных данных с точки зрения узла).
  4. Если проблема в cОтбирая результаты, перепишите алгоритм, чтобы он возвращал результаты не в консоли головного узла, а в распределенной файловой системе, которая может вместить данные (например, HDFS).

Без гаек и болтоввашей программы Apache Spark и доступа к вашему набору данных, а также к вашему кластеру Spark и его журналам, трудно понять, какой из этих общих подходов принесет вам наибольшую пользу;поэтому я перечислил их все.

Удачи!

...