Я пытаюсь реализовать Louvain algorihtm в pyspark с использованием фреймов данных. Проблема в том, что моя реализация слишком медленная. Вот как я это делаю:
- Я собираю все вершины и communityIds в простые python списки
- Для каждой пары вершина - communityId я вычисляю прирост модульности, используя кадры данных (просто причудливая формула, включающая граничные весовые суммы / различия)
- Повторять до тех пор, пока не изменится
Что я делаю не так?
Я полагаю, что если бы я мог как-то распараллелить каждый l oop производительность будет увеличиваться, но как я могу это сделать?
ПОСЛЕДНЕЕ РЕДАКТИРОВАНИЕ: Я мог бы использовать vertices.foreach(changeCommunityId)
вместо каждого для l oop, но затем Я должен был бы вычислить коэффициент модульности (эту причудливую формулу) без фреймов данных.
См. Пример кода ниже:
def louvain(self):
oldModularity = 0 # since intially each node represents a community
graph = self.graph
# retrieve graph vertices and edges dataframes
vertices = verticesDf = self.graph.vertices
aij = edgesDf = self.graph.edges
canOptimize = True
allCommunityIds = [row['communityId'] for row in verticesDf.select('communityId').distinct().collect()]
verticesIdsCommunityIds = [(row['id'], row['communityId']) for row in verticesDf.select('id', 'communityId').collect()]
allEdgesSum = self.graph.edges.groupBy().sum('weight').collect()
m = allEdgesSum[0]['sum(weight)']/2
def computeModularityGain(vertexId, newCommunityId):
# the sum of all weights of the edges within C
sourceNodesNewCommunity = vertices.join(aij, vertices.id == aij.src) \
.select('weight', 'src', 'communityId') \
.where(vertices.communityId == newCommunityId);
destinationNodesNewCommunity = vertices.join(aij, vertices.id == aij.dst) \
.select('weight', 'dst', 'communityId') \
.where(vertices.communityId == newCommunityId);
k_in = sourceNodesNewCommunity.join(destinationNodesNewCommunity, sourceNodesNewCommunity.communityId == destinationNodesNewCommunity.communityId) \
.count()
# the rest of the formula computation goes here, I just wanted to show you an example
# just return some value for the modularity
return 0.9
def changeCommunityId(vertexId, currentCommunityId):
maxModularityGain = 0
maxModularityGainCommunityId = None
for newCommunityId in allCommunityIds:
if (newCommunityId != currentCommunityId):
modularityGain = computeModularityGain(vertexId, newCommunityId)
if (modularityGain > maxModularityGain):
maxModularityGain = modularityGain
maxModularityGainCommunityId = newCommunityId
if (maxModularityGain > 0):
return maxModularityGainCommunityId
return currentCommunityId
while canOptimize:
while self.changeInModularity:
self.changeInModularity = False
for vertexCommunityIdPair in verticesIdsCommunityIds:
vertexId = vertexCommunityIdPair[0]
currentCommunityId = vertexCommunityIdPair[1]
newCommunityId = changeCommunityId(vertexId, currentCommunityId)
self.changeInModularity = False
canOptimize = False