Асинхронная многопроцессорная обработка в Python с пулом apply_async - PullRequest
0 голосов
/ 09 июля 2020

Я хотел бы обрабатывать временной график (по сути, список networkx графиков) параллельно, используя асинхронный параллелизм на машине с общей памятью. Для этого я использую Pool.apply_async() из модуля multiprocessing. Временной график состоит из 5 единичных (снимков) графиков. Для каждого единичного графа я выполняю несколько дорогостоящих в вычислительном отношении матричных операций.

Рассмотрим сначала простой последовательный пример:

#------------------------------------
# Constants
#------------------------------------
NV  = 100    # No. of vertices
NE  =  25    # No. of edges
NG  =   5    # No. of unit graphs

#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)

# Snapshot index
k = 0

# for each unit graph
for Gk in Gt:

    # Temporal adjacency matrix
    Atk = adj_mtrx(Gk)

    # Temporal weight matrix
    # ...

    # Temporal eigenvector centrality
    # ...

    k += 1

Он работает безупречно. Затем я пытаюсь назначить каждую матричную операцию работнику из пула:

#------------------------------------
# Constants
#------------------------------------
NV  = 100    # No. of vertices
NE  =  25    # No. of edges
NG  =   5    # No. of unit graphs
NP  =   2    # No. of processes

#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)

# Snapshot index
k = 0

if __name__ == '__main__':

    with Pool(processes=NP) as pool:

        # for each unit graph
        for Gk in Gt:
    
            # Temporal adjacency matrix
            Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
    
            # Temporal weight matrix
            # ...

            # Temporal eigenvector centrality
            # ...

            k += 1

Однако здесь происходит сбой кода со следующей ошибкой:

multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
TypeError: adj_mtrx() takes 1 positional argument but 100 were given
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "./aggr_vs_time_dat_par_mini.py", line 100, in <module>
    Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
TypeError: adj_mtrx() takes 1 positional argument but 100 were given

Мне нужна помощь в отладке проблема. Кажется, граф Gk раскладывается на Pool и передается в функцию как набор вершин. Также я был бы признателен, если бы вы прокомментировали (уместность) моего общего подхода к параллелизации с помощью Pool.apply_async() из multiprocessing.

Вы можете найти весь необходимый код для минимального рабочего примера ниже:

import networkx as nx
import random   as rnd
import numpy    as np

from multiprocessing import Pool

# Generates random graph
def gen_rnd_graph(nv, ne):
    
    # Create random list of sources
    Vsrc = [rnd.randint(0,nv-1) for iter in range(ne)]
    
    # Create random list of sinks
    Vsnk = [rnd.randint(0,nv-1) for iter in range(ne)]
    
    # Create random list of edge weights
    U = [rnd.random() for iter in range(ne)]
    
    # Create list of tuples {Vsrc, Vsnk, U}
    T = list(zip(Vsrc,Vsnk,U))
    
    # Create graph
    G = nx.Graph()
    
    # Create list of vertices
    V = list(range(nv))
    
    # Add nodes to graph
    G.add_nodes_from(V)
    
    # Add edges between random vertices with random edge weights
    G.add_weighted_edges_from(T)
    
    return G

# Generates time-varying graph
def gen_time_graph(nv, ne, ng):

    # Initialise list of graphs
    l = []

    for i in range(ng):
        gi = gen_rnd_graph(nv, ne)
        l.append(gi)

    return l

# Computes adjacency matrix for snaphot of time-varying graph
def adj_mtrx(Gk):

    # no. of vertices
    n = Gk.number_of_nodes()

    # adjacency matrix
    Ak = np.zeros([n,n])

    # for each vertex
    for i in range(n):
        for j in range(n):
            if Gk.has_edge(i,j): Ak[i,j] = 1
        
    return Ak

#------------------------------------
# Constants
#------------------------------------
NV  = 100    # No. of vertices
NE  =  25    # No. of edges
NG  =   5    # No. of unit graphs
NP  =   2    # No. of processes

#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)

# Snapshot index
k = 0

if __name__ == '__main__':

    with Pool(processes=NP) as pool:

        # for each unit graph
        for Gk in Gt:
        
            # Temporal adjacency matrix
            Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
        
            k += 1

1 Ответ

1 голос
/ 09 июля 2020

Из документации apply_async подпись функции

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

Следовательно, вам нужно передать Gk как кортеж, т.е. (Gk,):

Atk = pool.apply_async( adj_mtrx, (Gk,) ).get()

Фон

Ваша функция получает *Gk в качестве входных данных, что приводит к списку узлов:

import networks as nx
g = nx.karate_club_graph()
print(*g)
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33

1 и 0 кортежей

Подробнее о создании кортежей элементов 0 и 1: Как создать кортеж только с одним элементом или непосредственно в разделе документации python

В основном , вы создаете с помощью () кортеж длиной 0, с (Gk,) кортеж длиной 1, а для любого большего количества элементов вы можете использовать (x_1, ..., x_n) или (x_1, ..., x_n,).

* -оператор

* -оператор может использоваться для использования произвольного количества аргументов. См. Разделы python документации и перед . Точно так же вы можете использовать ** для произвольного количества аргументов ключевого слова. Для получения дополнительных сведений см. Что означает оператор «звезда» в вызове функции? и дубликаты, перечисленные в этом вопросе.

...