График производительности PageRank: PySpark vs sparklyr - PullRequest
0 голосов
/ 05 октября 2018

Я использую Spark / GraphFrames из Python и R. Когда я вызываю PageRank на небольшом графике из Python, это происходит намного медленнее, чем в R. Почему это намного медленнее с Python, учитывая, что обаPython и R вызывают одни и те же библиотеки?

Я попытаюсь продемонстрировать проблему ниже.

Spark / GraphFrames включает примеры графиков, таких как friends , как описано на этой ссылке .Это очень маленький ориентированный граф с 6 узлами и 8 ребрами (обратите внимание, что пример отличается от других версий GraphFrames).

enter image description here

Когда я запускаю следующий фрагмент кода с помощью R, вычисление PageRank занимает почти не время:

library(graphframes)
library(sparklyr)
library(dplyr)

nodes <- read.csv('nodes.csv')
edges <- read.csv('edges.csv')

sc <- spark_connect(master = "local", version = "2.1.1")

nodes_tbl <- copy_to(sc, nodes)
edges_tbl <- copy_to(sc, edges)

graph <- gf_graphframe(nodes_tbl, edges_tbl)
ranks <- gf_pagerank(graph, reset_probability = 0.15, tol = 0.01)
print(ranks$vertices)

results <- as.data.frame(ranks$vertices)
results <- arrange(results, id)
results$pagerank <- results$pagerank / sum(results$pagerank)

print(results)

Когда я запускаю эквивалент с PySpark, это занимает от 10 до 30 минут:

from pyspark.sql import SparkSession
from graphframes.examples import Graphs

if __name__ == '__main__':

    sc = SparkSession.builder.master("local").getOrCreate()
    g = Graphs(sc).friends()
    results = g.pageRank(resetProbability=0.15, tol=0.01)
    results.vertices.select("id", "pagerank").show()
    results.edges.select("src", "dst", "weight").show()

Я пытался привести разные версии Spark и GraphFrames для Python в соответствие с настройками R.

1 Ответ

0 голосов
/ 06 октября 2018

В общем, когда вы видите такие существенные различия во время выполнения между частями кода, которые, по-видимому, эквивалентны в разных бэкэндах, вы должны рассмотреть две возможности:

  • На самом деле они не эквивалентны.Несмотря на использование одних и тех же библиотек Java внутри системы, пути взаимодействия разных языков с JVM не одинаковы, и когда код достигает JVM, он может не использовать одну и ту же цепочку вызовов.
  • методы эквивалентны, но конфигурация и / или распределение данных не одинаковы.

В этом конкретном случае первая и наиболее очевидная причина - способ загрузки данных.

Однако, насколько я могу судить,скажем, эти параметры не должны влиять на время выполнения в данном конкретном случае.Более того, путь до того, как код достигает бэкэнда JVM в обоих случаях, кажется, не достаточно отличается, чтобы объяснить разницу.

Это говорит о том, что проблема лежит где-то в конфигурации.В общем, есть как минимум две опции, которые могут существенно повлиять на распределение данных и, следовательно, время выполнения:

  • spark.default.parallelism - используется с RDD API для определения количества разделов в разных случаяхВ том числе по умолчанию пост-случайное распределение.Для возможных последствий смотрите, например, Время итерации Spark экспоненциально увеличивается при использовании объединения

    Не похоже, что это влияет на ваш код здесь.

  • spark.sql.shuffle.partitions - используется с Dataset API для определения количества разделов после тасования (groupBy, join и т. Д.).

    В то время как код PageRank использует старый API GraphX, и этот параметр там напрямую не применяется, перед передачей данных в более старый API, включает в себя индексирование ребер и вершин с помощью Dataset API.

    Если вы проверите источник, вы увидите, что оба indexedEdges и indexVertices используют объединения, и, следовательно, зависят от spark.sql.shuffle.partitions.

    Кроме того, число разделов, установленных вышеупомянутыми методами, будет унаследовано объектом GraphX ​​Graph, что существенно повлияет на время выполнения.

    Если для spark.sql.shuffle.partitions установлено минимальное значение:

    spark: SparkSession
    spark.conf.set("spark.sql.shuffle.partitions", 1)
    

    время выполнения для таких небольших данных должно быть незначительным.

Заключение :

В средах могут использоваться различные значения spark.sql.shuffle.partitions.

Общие указания :

Если вы видите подобное поведение и хотите примерно сузить проблему, вам следует взглянуть на интерфейс Spark, иувидеть, где вещи расходятся.В этом случае вы, вероятно, увидите значительно различное количество заданий.

...