Spark GraphFrames High Shuffle, чтение / запись - PullRequest
0 голосов
/ 21 июня 2020

Привет, я создал Graph, используя файлы вершин и ребер. Размер графа 600 ГБ. Я запрашиваю этот график, используя функцию мотивов Spark GraphFrames. Я установил кластер AWS EMR для запроса графа.

детали кластера: - 1 главный и 8 подчиненных

Главный узел:

    m5.xlarge
    4 vCore, 16 GiB memory, EBS only storage
    EBS Storage:64 GiB

Подчиненный узел:

    m5.4xlarge
    16 vCore, 64 GiB memory, EBS only storage
    EBS Storage:256 GiB (per instance)

Я сталкиваюсь с очень большим количеством случайных операций чтения (3,4 ТБ) и записи (2 ТБ), это влияет на производительность, и для выполнения всего 10 запросов требуется около 50 минут. Есть ли способ уменьшить такой высокий уровень перемешивания .

Вот мой искровой код: -

val spark = SparkSession.builder.appName("SparkGraph POC").getOrCreate()

val g:GraphFrame  = GraphFrame(vertexDf, edgeDf)

//queries

    val q1 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c)")

    q1.filter(
      " r1.relationship = 'knows' and" +
                  " r2.relationship = 'knows'").distinct()
      .createOrReplaceTempView("q1table")

    spark.sql("select a.id as a_id,a.name as a_name," +
                      "b.id as b_id,b.name as b_name," +
                      "c.id as c_id,c.name as c_name from q1table")
      .write
      .option("quote", "\"")
      .option("escape", "\"")
      .option("header","true")
      .csv(resFilePath + "/q1")

    spark.catalog.uncacheTable("q1table")

    val q2 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c); (c)-[r3]->(d); (d)-[r4]->(e)")
    q2.filter(
      " a.name = 'user1' and" +
        " e.name = 'user4' and" +
        " r1.relationship = 'knows' and" +
        " r2.relationship = 'knows' and" +
        " r3.relationship = 'knows' and" +
        " r4.relationship = 'knows'").distinct()
      .createOrReplaceTempView("q2table")

    spark.sql("select a.id as a_id, a.name as a_name ," +
      "e.id as e_id, e.name as e_name from q2table")
      .write
      .option("quote", "\"")
      .option("escape", "\"")
      .option("header","true")
      .csv(resFilePath + "/q2")

    spark.catalog.uncacheTable("q2table")

spark.stop()

введите описание изображения здесь

1 Ответ

0 голосов
/ 21 июня 2020

Проблема с реализацией Graphframes состоит в том, что она выполняет самостоятельные соединения внутренних фреймов данных столько раз, сколько вы используете для мотивов. Это означает, что по мере увеличения длины цепочки у вас будет больше перетасовок

Вы можете увидеть более подробную информацию на https://www.waitingforcode.com/apache-spark-graphframes/motifs-finding-graphframes/read

Я также пробовал аналогичный подход и увидел, что, когда длина цепочки больше 12, Spark начинает не реагировать, а соединения с исполнителями теряются, даже если я увеличил ресурсы.

Если вы пытаетесь это сделать, я бы порекомендовал используя вместо этого базу данных графиков.

Надеюсь, это поможет

...