Внедрение Louvain в pyspark с использованием датафреймов - PullRequest
0 голосов
/ 19 апреля 2020

Я пытаюсь реализовать Louvain algorihtm в pyspark с использованием фреймов данных. Проблема в том, что моя реализация слишком медленная. Вот как я это делаю:

  1. Я собираю все вершины и communityIds в простые python списки
  2. Для каждой пары вершина - communityId я вычисляю прирост модульности, используя кадры данных (просто причудливая формула, включающая граничные весовые суммы / различия)
  3. Повторять до тех пор, пока не изменится

Что я делаю не так?

Я полагаю, что если бы я мог как-то распараллелить каждый l oop производительность будет увеличиваться, но как я могу это сделать?

ПОСЛЕДНЕЕ РЕДАКТИРОВАНИЕ: Я мог бы использовать vertices.foreach(changeCommunityId) вместо каждого для l oop, но затем Я должен был бы вычислить коэффициент модульности (эту причудливую формулу) без фреймов данных.

См. Пример кода ниже:

def louvain(self):

        oldModularity = 0 # since intially each node represents a community

        graph = self.graph

        # retrieve graph vertices and edges dataframes
        vertices = verticesDf = self.graph.vertices
        aij = edgesDf = self.graph.edges

        canOptimize = True

        allCommunityIds = [row['communityId'] for row in verticesDf.select('communityId').distinct().collect()]
        verticesIdsCommunityIds = [(row['id'], row['communityId']) for row in verticesDf.select('id', 'communityId').collect()]

        allEdgesSum = self.graph.edges.groupBy().sum('weight').collect()
        m = allEdgesSum[0]['sum(weight)']/2

        def computeModularityGain(vertexId, newCommunityId):

            # the sum of all weights of the edges within C
            sourceNodesNewCommunity = vertices.join(aij, vertices.id == aij.src) \
                                .select('weight', 'src', 'communityId') \
                                .where(vertices.communityId == newCommunityId);
            destinationNodesNewCommunity = vertices.join(aij, vertices.id == aij.dst) \
                                .select('weight', 'dst', 'communityId') \
                                .where(vertices.communityId == newCommunityId);

            k_in = sourceNodesNewCommunity.join(destinationNodesNewCommunity, sourceNodesNewCommunity.communityId == destinationNodesNewCommunity.communityId) \
                        .count()
            # the rest of the formula computation goes here, I just wanted to show you an example
            # just return some value for the modularity
            return 0.9  

        def changeCommunityId(vertexId, currentCommunityId):

            maxModularityGain = 0
            maxModularityGainCommunityId = None
            for newCommunityId in allCommunityIds:
                if (newCommunityId != currentCommunityId):
                    modularityGain = computeModularityGain(vertexId, newCommunityId)
                    if (modularityGain > maxModularityGain):
                        maxModularityGain = modularityGain
                        maxModularityGainCommunityId = newCommunityId

            if (maxModularityGain > 0):
                return maxModularityGainCommunityId
            return currentCommunityId

        while canOptimize:

            while self.changeInModularity:

                self.changeInModularity = False

                for vertexCommunityIdPair in verticesIdsCommunityIds:
                    vertexId = vertexCommunityIdPair[0]
                    currentCommunityId = vertexCommunityIdPair[1]
                    newCommunityId = changeCommunityId(vertexId, currentCommunityId)

                self.changeInModularity = False

            canOptimize = False
...