Я новичок в pyspark и пытаюсь понять, как работает PageRank.Я использую Spark 1.6 в Jupyter на Cloudera.Скриншоты моих вершин и ребер (а также схемы) находятся по следующим ссылкам: verticesRDD и dgeRDD
У меня есть следующий код:
#import relevant libraries for Graph Frames
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from graphframes import *
#Read the csv files
verticesRDD = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true').load("filepath/station.csv")
edgesRDD = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true').load("filepath/trip.csv")
#Renaming the id columns to enable GraphFrame
verticesRDD = verticesRDD.withColumnRenamed("station_ID", "id")
edgesRDD = edgesRDD.withColumnRenamed("Trip ID", "id")
edgesRDD = edgesRDD.withColumnRenamed("Start Station", "src")
edgesRDD = edgesRDD.withColumnRenamed("End Station", "dst")
#Register as temporary tables for running the analysis
verticesRDD.registerTempTable("verticesRDD")
edgesRDD.registerTempTable("edgesRDD")
#Note: whether i register the RDDs as temp tables or not, i get the same results... so im not sure if this step is really needed
#Make the GraphFrame
g = GraphFrame(verticesRDD, edgesRDD)
Теперь, когда я запускаю функцию pageRank:
g.pageRank(resetProbability=0.15, maxIter=10)
Py4JJavaError: Произошла ошибка при вызове o98.run .: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: Задача 0 на этапе 79.0 не выполнена 1 раз, последний сбой: Потерянная задача 0.0 на этапе 79.0 (TID 2637, localhost): scala.MatchError: [null, null, [913460,765,8 / 31/2015 23:26,Harry Bridges Plaza (Ferry Building), 50,8 / 31/2015 23: 39, Сан-Франциско Кальтрейн (Таунсенд, 4-е), 70 288, подписчик, 2139]] (из класса org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
results = g.pageRank(resetProbability=0.15, maxIter=10, sourceId="id")
Py4JJavaError: Произошла ошибка при вызове o166.run .: org.graphframes.NoSuchVertexException: алгоритм GraphFrame с заданным идентификатором вершины, который не существует в Graph.Идентификатор вершины не содержится в GraphFrame (v: [id: int, имя: строка, lat: double, long: double, счет дока: int, наземный ориентир: строка, установка: строка], e: [src: строка, dst: строка, id: int, Продолжительность: int, Дата начала: строка, Терминал начала: int, Дата окончания: строка, Терминал окончания: int, Номер велосипеда: int, Тип подписчика: строка, Почтовый индекс: строка])
ranks = g.pageRank.resetProbability(0.15).maxIter(10).run()
AttributeError: у объекта 'function' нет атрибута 'resetProbability'
ranks = g.pageRank(resetProbability=0.15, maxIter=10).run()
Py4JJavaError: Произошла ошибка при вызове o188.run .: org.apache.spark.SparkException: Задание прервано из-за этапаОшибка: Задача 0 на этапе 90.0 не выполнена 1 раз, последний сбой: Потерянная задача 0.0 на этапе 90.0 (TID 2641, localhost): scala.MatchError: [ноль, ноль, [913460,765,8 / 31/2015 23:26, Harry Bridges Plaza (Ferry Building), 50,8 / 31/2015 23: 39, Сан-Франциско Кальтрейн (Таунсенд на 4-м), 70 288, Абонент, 2139]] (класса org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
Я читаю PageRank , но не понимаюгде я иду не так ... любая помощь будет оценена