Порождение процессов и обмен данными между процессами в приложении Python на основе трио - PullRequest
0 голосов
/ 04 июля 2018

Для стажировки в библиотеке Python liquidimage мы исследуем, может ли быть хорошей идеей написать параллельное приложение HPC с моделью клиент / сервер с использованием библиотеки trio .

Для асинхронного программирования и ввода / вывода трио действительно великолепно!

Тогда мне интересно, как

  1. процессы порождения (серверы, выполняющие ограниченную работу CPU-GPU)
  2. передача сложных объектов Python (потенциально содержащих большие массивы) между процессами.

Я не нашел рекомендованного способа сделать это с помощью trio в его документации (даже если учебное пособие по эхо-клиенту / серверу - хорошее начало).

Одним из очевидных способов порождения процессов в Python и связи является использование multiprocessing .

В контексте HPC, я думаю, одним хорошим решением было бы использование MPI (http://mpi4py.readthedocs.io/en/stable/overview.html#dynamic-process-management). Для справки, я также должен упомянуть rpyc (https://rpyc.readthedocs.io/en/latest/docs/zerodeploy.html#zerodeploy).

Я не знаю, можно ли использовать такие инструменты вместе с трио и как правильно это сделать.

Интересный связанный вопрос

Примечание PEP 574

Мне кажется, что PEP 574 (см. https://pypi.org/project/pickle5/) также может быть хорошим решением этой проблемы.

Ответы [ 4 ]

0 голосов
/ 04 июля 2018

Простой пример с mpi4py ... Это может быть плохая работа с точки зрения трио, но, похоже, работает.

Связь осуществляется с trio.run_sync_in_worker_thread, поэтому ( как написано Натаниэлем Дж. Смитом ) (1) без отмены (и без поддержки control-C) и (2) использовать больше памяти, чем задач трио ( но один поток Python не использует столько памяти).

Но для коммуникаций, включающих большие массивы, я бы так поступил, поскольку связь с буферизованными объектами будет очень эффективной с mpi4py .

import sys
from functools import partial

import trio

import numpy as np
from mpi4py import MPI

async def sleep():
    print("enter sleep")
    await trio.sleep(0.2)
    print("end sleep")

def cpu_bounded_task(input_data):
    print("cpu_bounded_task starting")
    result = input_data.copy()
    for i in range(1000000-1):
        result += input_data
    print("cpu_bounded_task finished ")
    return result

if "server" not in sys.argv:
    comm = MPI.COMM_WORLD.Spawn(sys.executable,
                                args=['trio_spawn_comm_mpi.py', 'server'])

    async def client():
        input_data = np.arange(4)
        print("in client: sending the input_data", input_data)
        send = partial(comm.send, dest=0, tag=0)
        await trio.run_sync_in_worker_thread(send, input_data)

        print("in client: recv")
        recv = partial(comm.recv, tag=1)
        result = await trio.run_sync_in_worker_thread(recv)
        print("in client: result received", result)

    async def parent():
        async with trio.open_nursery() as nursery:
            nursery.start_soon(sleep)
            nursery.start_soon(client)
            nursery.start_soon(sleep)

    trio.run(parent)

    print("in client, end")
    comm.barrier()

else:
    comm = MPI.Comm.Get_parent()

    async def main_server():
        # get the data to be processed
        recv = partial(comm.recv, tag=0)
        input_data = await trio.run_sync_in_worker_thread(recv)
        print("in server: input_data received", input_data)
        # a CPU-bounded task
        result = cpu_bounded_task(input_data)
        print("in server: sending back the answer", result)
        send = partial(comm.send, dest=0, tag=1)
        await trio.run_sync_in_worker_thread(send, result)

    trio.run(main_server)
    comm.barrier()
0 голосов
/ 04 июля 2018

По состоянию на середину 2018 года, Трио еще не делает этого. Наилучший вариант на сегодняшний день - использовать trio_asyncio, чтобы использовать поддержку asyncio для функций, которые Trio еще предстоит освоить.

0 голосов
/ 04 июля 2018

Я публикую очень наивный пример кода с использованием многопроцессорной обработки и трио (в основной программе и на сервере). Вроде работает.

from multiprocessing import Process, Queue
import trio
import numpy as np

async def sleep():
    print("enter sleep")
    await trio.sleep(0.2)
    print("end sleep")

def cpu_bounded_task(input_data):
    result = input_data.copy()
    for i in range(1000000-1):
        result += input_data
    return result

def server(q_c2s, q_s2c):
    async def main_server():
        # get the data to be processed
        input_data = await trio.run_sync_in_worker_thread(q_c2s.get)
        print("in server: input_data received", input_data)
        # a CPU-bounded task
        result = cpu_bounded_task(input_data)
        print("in server: sending back the answer", result)
        await trio.run_sync_in_worker_thread(q_s2c.put, result)

    trio.run(main_server)

async def client(q_c2s, q_s2c):
    input_data = np.arange(10)
    print("in client: sending the input_data", input_data)
    await trio.run_sync_in_worker_thread(q_c2s.put, input_data)
    result = await trio.run_sync_in_worker_thread(q_s2c.get)
    print("in client: result received", result)

async def parent(q_c2s, q_s2c):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(sleep)
        nursery.start_soon(client, q_c2s, q_s2c)
        nursery.start_soon(sleep)

def main():
    q_c2s = Queue()
    q_s2c = Queue()
    p = Process(target=server, args=(q_c2s, q_s2c))
    p.start()
    trio.run(parent, q_c2s, q_s2c)
    p.join()

if __name__ == '__main__':
    main()
0 голосов
/ 04 июля 2018

К сожалению, на сегодняшний день (июль 2018 года) Trio еще не поддерживает порождение и связь с подпроцессами или какими-либо высокоуровневыми оболочками для MPI или других высокоуровневых межпроцессных протоколов координации.

Это определенно то, к чему мы в конечном итоге хотим добраться, и если вы хотите поговорить более подробно о том, что должно быть реализовано, то вы можете перейти в наш чат или this Проблема содержит обзор того, что необходимо для поддержки основных подпроцессов. Но если ваша цель состоит в том, чтобы в течение нескольких месяцев что-то сработало для вашей стажировки, честно говоря, вы можете рассмотреть более зрелые инструменты HPC, такие как dask .

...