Использование Asyncio для создания новых процессов Python - PullRequest
0 голосов
/ 17 мая 2019

Я настраиваю функцию для асинхронного запуска нового процесса для запуска очень тяжелой функции процессора.Большая часть документации не охватывает это полностью, и то, что я собрал, похоже, не работает асинхронно.

У меня есть функция procManager, которая принимает функцию, аргументы для передачи вфункция и имя объекта для базовой регистрации.

async def procManager(f,a,o):
    print(f"{o} started at {time.strftime('%X')}")
    p = Process(target=f, args=(a,))
    p_parent = os.getppid()   # parent process
    p_curr = os.getpid()     # current process
    print("parent process:", p_parent)
    print("current process:", p_curr)
    p.start()
    p.join()
    print(f"{o} finished at {time.strftime('%X')}")
    print("=========")

У меня есть эта тяжелая функция ЦП, которая запускает обнаружение сообщества Лувена на графе networkX, который я передаю в def procManager, чтобы вызвать новый процесс.

def community(cg):
    start = timer()
    partition = c.best_partition(cg) #default louvain community detection
    v = {} #create dict to group nodes by community
    for key, value in sorted(partition.items()):
        v.setdefault(value, []).append(key)
    stop = timer()
    print(stop-start)

Основная функция выглядит так.Я инициализирую 2 графика A и B по 3000 и 1000 узлов соответственно, со средней степенью 5. Я использую ноутбук Jupyter для этого, поэтому я использую await main() вместо asyncio.run.

A = nx.barabasi_albert_graph(3000,5)  
B = nx.barabasi_albert_graph(1000,5)  

async def main():
    task1 = asyncio.create_task(
        procManager(community, A, "A"))

    task2 = asyncio.create_task(
        procManager(community, B, "B"))

    print("async start")

await main()

То, что я пытаюсь сделать, - это обрабатывать А и В асинхронно (т.е. запускать одновременно), но в разных процессах.Токовые выходы выглядят следующим образом, где A и B обрабатываются в новых процессах, но блокируются.Мне нужно будет вычислить сообщества A и B асинхронно, потому что они будут вызваны потоком rabbitMQ, а ответы должны быть неблокирующими.

async done
A started at 06:03:48
parent process: 5783
current process: 12121
11.424800566000158
A finished at 06:03:59
=========
B started at 06:03:59
parent process: 5783
current process: 12121
0.037437027999885686
B finished at 06:03:59
=========

Надеюсь, вы, ребята, можете помочь!

Ответы [ 2 ]

1 голос
/ 17 мая 2019

В вашем случае проблема заключается в методе join().Он блокируется до завершения процесса.Кроме того, вам даже не понадобится asyncio для этого.Посмотрите на этот быстрый пример:

import time
from multiprocessing import Process

def procManager(f,a,o):
    print(f"{o} started at {time.strftime('%X')}")
    p = Process(target=f, args=(a,))
    p.start()
    # p.join()
    print(f"{o} finished at {time.strftime('%X')}") # This will occur immediately
    print("=========")

def community(cg):
    for i in range(10):
        print("%s - %s" %(cg, i))
        time.sleep(1)

procManager(community, "This is A", "A")
procManager(community, "This is B", "B")

Это должно дать вам представление о том, как решить вашу проблему.Надеюсь, это поможет!

0 голосов
/ 17 мая 2019

Что касается Asyncio, вам нужно использовать метод asyncio.create_task. Хитрость этого метода заключается в том, что вы должны указывать только те функции, которые вы объявили асинхронными. Чтобы запустить их, вы должны использовать await asyncio.gather.

Примером будет:

import asyncio

async def print_hello(name):
    print("Hello! {}".format(name))

name_list = ["billy", "bob", "buffalo bob"]

for item in name_list:
    await asyncio.gather(print_hello(item))

Самой простой формой создания и запуска подпроцессов с помощью asyncio является метод create_task, описанный здесь: Документы Asyncio

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...