Как узнать соседние вершины конкретной вершины в графе (pyspark)? - PullRequest
0 голосов
/ 27 января 2019

Я пытаюсь найти соседние вершины конкретной вершины, используя API графического фрейма, доступный в pyspark.Как мне это сделать?Например, рассмотрим следующие ребра графа (он должен рассматриваться как двунаправленный, хотя ввод является направленным).

edges = [[4,3],[4,5],[5,6],[3,6],[1,3],[1,0],[0,3])
vertices = [0,1,3,4,5,6]
g = GraphFrame(vertices,edges) //this makes the graph directional, is there a way to make it bidirectional?

Now I want to do something like-
degree(3) = 5    
neighbour(3) = [4,5,6,1,0]

Вот мой код, который принимает входной файл (edge.txt), такой как

v1 v2
4 3
4 5
5 6
3 6
1 3
1 0
0 3
import sys

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName('myapp')
sc = SparkContext(conf=conf)

sc.setLogLevel("WARN")

spark = SparkSession(sc)

file_name = sys.argv[1]
log_txt = sc.textFile("/user/rikhan/"+str(file_name))
header = log_txt.first()
log_txt = log_txt.filter(lambda line : line!=header)  

temp_var = log_txt.map(lambda k: k.split(" "))

hasattr(temp_var,"toDF")
log_df = temp_var.toDF(header.split(" "))

log_df.dropDuplicates(['v1','v2'])

from functools import reduce
from pyspark.sql.functions import col,lit,when
from graphframes import *
import networkx as nx
import networkx.generators.small as gs
import matplotlib.pyplot as plt
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql import Column
from pyspark.sql import GroupedData
from pyspark.sql import DataFrameNaFunctions
from pyspark.sql import DataFrameStatFunctions
from pyspark.sql import functions
from pyspark.sql import types
from pyspark.sql import Window

edges = log_df.selectExpr("v1 as src","v2 as dst")

vertices = log_df.toPandas()['v1'].unique()
vertices2 = log_df.toPandas()['v2'].unique()
ver = vertices.tolist() + vertices2.tolist()
vertex = []
for x in ver:
    if x not in vertex:
        vertex.append(x)

rdd1 = sc.parallelize(vertex)
row_rdd = rdd1.map(lambda x: Row(x))
ver = spark.createDataFrame(row_rdd,['id'])

g = GraphFrame(ver,edges)
...