Многопроцессорная обработка Python (3.7+): замените конвейерное соединение между мастером и работниками асинхронным для параллельного ввода-вывода - PullRequest
0 голосов
/ 23 сентября 2018

Предположим, у нас есть следующая игрушечная версия конвейера master-worker для параллельного сбора данных

# pip install gym
import gym
import numpy as np
from multiprocessing import Process, Pipe

def worker(master_conn, worker_conn):
    master_conn.close()

    env = gym.make('Pendulum-v0')
    env.reset()

    while True:
        cmd, data = worker_conn.recv()

        if cmd == 'close':
            worker_conn.close()
            break
        elif cmd == 'step':
            results = env.step(data)
            worker_conn.send(results)

class Master(object):
    def __init__(self):
        self.master_conns, self.worker_conns = zip(*[Pipe() for _ in range(10)])
        self.list_process = [Process(target=worker, args=[master_conn, worker_conn], daemon=True) 
                             for master_conn, worker_conn in zip(self.master_conns, self.worker_conns)]
        [p.start() for p in self.list_process]
        [worker_conn.close() for worker_conn in self.worker_conns]

    def go(self, actions):
        [master_conn.send(['step', action]) for master_conn, action in zip(self.master_conns, actions)]
        results = [master_conn.recv() for master_conn in self.master_conns]

        return results

    def close(self):
        [master_conn.send(['close', None]) for master_conn in self.master_conns]
        [p.join() for p in self.list_process]

master = Master()
l = []
T = 1000
for t in range(T):
    actions = np.random.rand(10, 1)
    results = master.go(actions)
    l.append(len(results))

sum(l)

Из-за соединений Pipe между master каждым рабочим, для каждого временного шага мы должны отправлять командуработник через трубу, и работник отправляет обратно результаты.Нам нужно сделать это на длительный период.Это иногда будет немного медленным из-за частой связи.

Поэтому мне интересно, если бы я понял, если использовать последнюю асинхронную функцию Python в сочетании с Process для замены Pipe, это может быть потенциально ускорено из-за параллелизма ввода-вывода, если я понимаюего функциональность правильно.

1 Ответ

0 голосов
/ 23 сентября 2018

В многопроцессорном модуле уже есть решение для параллельной обработки задач: multiprocessing.Pool

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

Вы можете добиться того же, используя multiprocessing.Queue.Я считаю, что именно так pool.map() реализуется внутри.

Итак, в чем разница между multiprocessing.Queue и multiprocessing.Pipe?Queue это просто Pipe плюс некоторый механизм блокировки.Поэтому несколько рабочих процессов могут совместно использовать только один Queue (или, точнее, 2 - один для команд, один для результатов), но для Pipe каждому процессу потребуется собственный Pipe (или пара, или дуплексный).), именно так, как вы это делаете сейчас.

Единственный недостаток Queue - это производительность - поскольку все процессы используют один мьютекс очереди, он плохо масштабируется для многих процессов.Чтобы быть уверенным, что он может обрабатывать десятки тысяч элементов / с, я бы выбрал Pipe, но для классического варианта использования параллельной обработки я думаю, что Queue или просто Pool.map() могут быть в порядке, потому что их гораздо проще использовать.(Управление процессами может быть сложным, и asyncio также не облегчает его.)

Надеюсь, это поможет, я знаю, что я ответил на несколько иной вопрос, чем вы задавали:)

...