Кратчайший путь с помощью Pyspark
Входные данные можно интерпретировать как график со связями между currentnode
и childnode
. Тогда возникает вопрос: каков кратчайший путь между узлом root и всеми листовыми узлами и называется кратчайший путь от одного источника .
Spark имеет Graphx для обработки параллельных вычислений графиков. К сожалению, GraphX не предоставляет Python API (более подробную информацию можно найти здесь ). Библиотека графиков с поддержкой Python: GraphFrames . GraphFrames использует части GraphX.
И GraphX, и GraphFrames предоставляют решение для sssp. К сожалению, обе реализации возвращают только длину кратчайших путей, а не сами пути ( GraphX и GraphFrames ). Но этот ответ предоставляет реализацию алгоритма для GraphX и Scala, который также возвращает пути. Все три решения используют Pregel .
Перевод вышеупомянутого ответа на GraphFrames / Python:
1. Подготовка данных
Укажите уникальные идентификаторы для всех узлов и измените имена столбцов, чтобы они соответствовали описанным здесь именам здесь
df = ...
vertices = df.select("currentnode").withColumnRenamed("currentnode", "node").union(df.select("childnode")).distinct().withColumn("id", F.monotonically_increasing_id()).cache()
edges = df.join(vertices, df.currentnode == vertices.node).drop(F.col("node")).withColumnRenamed("id", "src")\
.join(vertices, df.childnode== vertices.node).drop(F.col("node")).withColumnRenamed("id", "dst").cache()
Nodes Edges
+------+------------+ +-----------+---------+------------+------------+
| node| id| |currentnode|childnode| src| dst|
+------+------------+ +-----------+---------+------------+------------+
| leaf2| 17179869184| | child1| leaf4| 25769803776|249108103168|
|child1| 25769803776| | child1| child3| 25769803776| 68719476736|
|child3| 68719476736| | child1| leaf2| 25769803776| 17179869184|
| leaf6|103079215104| | child3| leaf6| 68719476736|103079215104|
| root|171798691840| | child3| leaf5| 68719476736|214748364800|
| leaf5|214748364800| | root| child1|171798691840| 25769803776|
| leaf4|249108103168| +-----------+---------+------------+------------+
+------+------------+
2. Создайте GraphFrame
from graphframes import GraphFrame
graph = GraphFrame(vertices, edges)
3. Создайте UDF, которые будут формировать отдельные части алгоритма Прегеля.
Тип сообщения:
from pyspark.sql.types import *
vertColSchema = StructType()\
.add("dist", DoubleType())\
.add("node", StringType())\
.add("path", ArrayType(StringType(), True))
Программа вершин:
def vertexProgram(vd, msg):
if msg == None or vd.__getitem__(0) < msg.__getitem__(0):
return (vd.__getitem__(0), vd.__getitem__(1), vd.__getitem__(2))
else:
return (msg.__getitem__(0), vd.__getitem__(1), msg.__getitem__(2))
vertexProgramUdf = F.udf(vertexProgram, vertColSchema)
Исходящие сообщения:
def sendMsgToDst(src, dst):
srcDist = src.__getitem__(0)
dstDist = dst.__getitem__(0)
if srcDist < (dstDist - 1):
return (srcDist + 1, src.__getitem__(1), src.__getitem__(2) + [dst.__getitem__(1)])
else:
return None
sendMsgToDstUdf = F.udf(sendMsgToDst, vertColSchema)
Агрегирование сообщений:
def aggMsgs(agg):
shortest_dist = sorted(agg, key=lambda tup: tup[1])[0]
return (shortest_dist.__getitem__(0), shortest_dist.__getitem__(1), shortest_dist.__getitem__(2))
aggMsgsUdf = F.udf(aggMsgs, vertColSchema)
4. Объедините части
from graphframes.lib import Pregel
result = graph.pregel.withVertexColumn(colName = "vertCol", \
initialExpr = F.when(F.col("node")==(F.lit("root")), F.struct(F.lit(0.0), F.col("node"), F.array(F.col("node")))) \
.otherwise(F.struct(F.lit(float("inf")), F.col("node"), F.array(F.lit("")))).cast(vertColSchema), \
updateAfterAggMsgsExpr = vertexProgramUdf(F.col("vertCol"), Pregel.msg())) \
.sendMsgToDst(sendMsgToDstUdf(F.col("src.vertCol"), Pregel.dst("vertCol"))) \
.aggMsgs(aggMsgsUdf(F.collect_list(Pregel.msg()))) \
.setMaxIter(10) \
.setCheckpointInterval(2) \
.run()
result.select("vertCol.path").show(truncate=False)
Примечания:
maxIter
должно быть установлено на значение, по крайней мере, такое же большое, как самый длинный путь. Если значение больше, результат не изменится, но время вычислений станет больше. Если значение слишком мало, в результате будут отсутствовать более длинные пути. Текущая версия GraphFrames (0.8.0) не поддерживает остановку l oop, когда больше не отправляются новые сообщения. Для checkpointInterval
должно быть установлено значение меньше maxIter
. Фактическое значение зависит от данных и доступного оборудования. Когда возникает исключение OutOfMemory или сеанс Spark зависает на некоторое время, значение может быть уменьшено.
Конечный результат - обычный фрейм данных с содержимым
+-----------------------------+
|path |
+-----------------------------+
|[root, child1] |
|[root, child1, leaf4] |
|[root, child1, child3] |
|[root] |
|[root, child1, child3, leaf6]|
|[root, child1, child3, leaf5]|
|[root, child1, leaf2] |
+-----------------------------+
При необходимости нелистовые узлы могут быть отфильтрованы здесь.