Постройте иерархию из реляционного набора данных с помощью Pyspark - PullRequest
2 голосов
/ 18 июня 2020

Я новичок в Python и застрял на построении иерархии из набора реляционных данных. Было бы очень полезно, если бы у кого-то была идея, как поступить с этим.

У меня есть реляционный набор данных с такими данными, как

_currentnode,  childnode_  
 root,         child1  
 child1,       leaf2  
 child1,       child3  
 child1,       leaf4  
 child3,       leaf5  
 child3,       leaf6  

и так далее. Я ищу код python или pyspark для построить фрейм данных иерархии, как показано ниже

_level1, level2,  level3,  level4_  
root,    child1,  leaf2,   null  
root,    child1,  child3,  leaf5  
root,    child1,  child3,  leaf6  
root,    child1,  leaf4,   null  

Данные являются буквенно-цифровыми и представляют собой огромный набор данных [~ 50 миллионов записей]. Кроме того, root иерархии известен и может быть встроен в код. Итак, в приведенном выше примере root иерархии - это 'root'.

Заранее спасибо, Вардхан.

пс: Спасибо за редактирование :)

1 Ответ

2 голосов
/ 22 июня 2020

Кратчайший путь с помощью Pyspark

Входные данные можно интерпретировать как график со связями между currentnode и childnode. Тогда возникает вопрос: каков кратчайший путь между узлом root и всеми листовыми узлами и называется кратчайший путь от одного источника .

Spark имеет Graphx для обработки параллельных вычислений графиков. К сожалению, GraphX ​​не предоставляет Python API (более подробную информацию можно найти здесь ). Библиотека графиков с поддержкой Python: GraphFrames . GraphFrames использует части GraphX.

И GraphX, и GraphFrames предоставляют решение для sssp. К сожалению, обе реализации возвращают только длину кратчайших путей, а не сами пути ( GraphX ​​ и GraphFrames ). Но этот ответ предоставляет реализацию алгоритма для GraphX ​​и Scala, который также возвращает пути. Все три решения используют Pregel .

Перевод вышеупомянутого ответа на GraphFrames / Python:

1. Подготовка данных

Укажите уникальные идентификаторы для всех узлов и измените имена столбцов, чтобы они соответствовали описанным здесь именам здесь

df = ...

vertices = df.select("currentnode").withColumnRenamed("currentnode", "node").union(df.select("childnode")).distinct().withColumn("id", F.monotonically_increasing_id()).cache()

edges = df.join(vertices, df.currentnode == vertices.node).drop(F.col("node")).withColumnRenamed("id", "src")\
        .join(vertices, df.childnode== vertices.node).drop(F.col("node")).withColumnRenamed("id", "dst").cache() 
Nodes                   Edges
+------+------------+   +-----------+---------+------------+------------+
|  node|          id|   |currentnode|childnode|         src|         dst|
+------+------------+   +-----------+---------+------------+------------+
| leaf2| 17179869184|   |     child1|    leaf4| 25769803776|249108103168|
|child1| 25769803776|   |     child1|   child3| 25769803776| 68719476736|
|child3| 68719476736|   |     child1|    leaf2| 25769803776| 17179869184|
| leaf6|103079215104|   |     child3|    leaf6| 68719476736|103079215104|
|  root|171798691840|   |     child3|    leaf5| 68719476736|214748364800|
| leaf5|214748364800|   |       root|   child1|171798691840| 25769803776|
| leaf4|249108103168|   +-----------+---------+------------+------------+
+------+------------+   

2. Создайте GraphFrame

from graphframes import GraphFrame
graph = GraphFrame(vertices, edges)

3. Создайте UDF, которые будут формировать отдельные части алгоритма Прегеля.

Тип сообщения:
from pyspark.sql.types import *
vertColSchema = StructType()\
      .add("dist", DoubleType())\
      .add("node", StringType())\
      .add("path", ArrayType(StringType(), True))

Программа вершин:

def vertexProgram(vd, msg):
    if msg == None or vd.__getitem__(0) < msg.__getitem__(0):
        return (vd.__getitem__(0), vd.__getitem__(1), vd.__getitem__(2))
    else:
        return (msg.__getitem__(0), vd.__getitem__(1), msg.__getitem__(2))
vertexProgramUdf = F.udf(vertexProgram, vertColSchema)

Исходящие сообщения:

def sendMsgToDst(src, dst):
    srcDist = src.__getitem__(0)
    dstDist = dst.__getitem__(0)
    if srcDist < (dstDist - 1):
        return (srcDist + 1, src.__getitem__(1), src.__getitem__(2) + [dst.__getitem__(1)])
    else:
        return None
sendMsgToDstUdf = F.udf(sendMsgToDst, vertColSchema)

Агрегирование сообщений:

def aggMsgs(agg):
    shortest_dist = sorted(agg, key=lambda tup: tup[1])[0]
    return (shortest_dist.__getitem__(0), shortest_dist.__getitem__(1), shortest_dist.__getitem__(2))
aggMsgsUdf = F.udf(aggMsgs, vertColSchema)

4. Объедините части

from graphframes.lib import Pregel
result = graph.pregel.withVertexColumn(colName = "vertCol", \
    initialExpr = F.when(F.col("node")==(F.lit("root")), F.struct(F.lit(0.0), F.col("node"), F.array(F.col("node")))) \
    .otherwise(F.struct(F.lit(float("inf")), F.col("node"), F.array(F.lit("")))).cast(vertColSchema), \
    updateAfterAggMsgsExpr = vertexProgramUdf(F.col("vertCol"), Pregel.msg())) \
    .sendMsgToDst(sendMsgToDstUdf(F.col("src.vertCol"), Pregel.dst("vertCol"))) \
    .aggMsgs(aggMsgsUdf(F.collect_list(Pregel.msg()))) \
    .setMaxIter(10) \
    .setCheckpointInterval(2) \
    .run()
result.select("vertCol.path").show(truncate=False)   

Примечания:

  • maxIter должно быть установлено на значение, по крайней мере, такое же большое, как самый длинный путь. Если значение больше, результат не изменится, но время вычислений станет больше. Если значение слишком мало, в результате будут отсутствовать более длинные пути. Текущая версия GraphFrames (0.8.0) не поддерживает остановку l oop, когда больше не отправляются новые сообщения. Для
  • checkpointInterval должно быть установлено значение меньше maxIter. Фактическое значение зависит от данных и доступного оборудования. Когда возникает исключение OutOfMemory или сеанс Spark зависает на некоторое время, значение может быть уменьшено.

Конечный результат - обычный фрейм данных с содержимым

+-----------------------------+
|path                         |
+-----------------------------+
|[root, child1]               |
|[root, child1, leaf4]        |
|[root, child1, child3]       |
|[root]                       |
|[root, child1, child3, leaf6]|
|[root, child1, child3, leaf5]|
|[root, child1, leaf2]        |
+-----------------------------+

При необходимости нелистовые узлы могут быть отфильтрованы здесь.

...