Python multiprocessing - конвейерная связь между процессами - PullRequest
1 голос
/ 12 марта 2019

Я делаю проект, который собирает данные с датчиков клиентов, обрабатывает собранные данные и отправляет их клиентам. Может быть несколько клиентов, которые запрашивают данные с нашего сервера одновременно, поэтому мне пришлось реализовать многопроцессорность. Я не могу использовать потоки, потому что есть определенные переменные, которые должны быть независимыми от клиента. Если бы я это сделал, мой код, вероятно, стал бы очень сложным для чтения и обновления, и я этого не хочу. Поэтому я решил использовать процессы, но теперь есть некоторые данные, которые необходимо разделить между родительскими и дочерними процессами. После некоторых исследований я обнаружил, что связь по трубам будет отвечать моим требованиям.

Следующий код успешно отправляет данные из родительского процесса в дочерний процесс, дочерний процесс обновляет данные и отправляет их обратно в родительский процесс. Но он работает только из-за функции sleep (), которая не позволяет родительскому каналу использовать канал одновременно с дочерним.

Как его можно изменить, чтобы он делал то же самое, но без функции sleep (), для которой, я полагаю, это, скорее всего, вызовет проблемы в будущем?

from multiprocessing import Process, Pipe
import time

def update_data(pipe):
    p_out, p_in = pipe
    L = []
    while True:
        message = p_out.recv()
        if message=='FINISHED':
            break       
        L.append(message)      

    L.append(['new data'])       #updating received data
    writer(L, p_in)              #sending received data to parent Process
    p_in.close()

def writer(i, p_in):
    p_in.send(i)
    p_in.send('FINISHED')

L = ['0' for i in range(10)]     #current data
if __name__=='__main__':
    p_out, p_in = Pipe()
    update_data_process = Process(target=update_data, args=((p_out, p_in),))
    update_data_process.start()    
    writer(L, p_in)              #sending current data to child Process
    time.sleep(3)                #needs to be changed
    while True:
        message = p_out.recv()
        if message != 'FINISHED':
            L = message
        else:
            break
    print(L)
    p_in.close()
    update_data_process.join()

1 Ответ

0 голосов
/ 12 марта 2019

У вас есть проблема, потому что вы обрабатываете соединения, как если бы они были simplex , но Pipe () по умолчанию возвращает duplex (двухсторонние) соединения. Это означает, что когда вы звоните parent_conn, child_conn = Pipe(), вы получаете одно соединение, только родитель должен использовать для чтения и записи и еще один такой объект подключения для ребенка. Родительские и дочерние элементы работают только с их объектами соединения.

from multiprocessing import Process, Pipe
from datetime import datetime

SENTINEL = 'SENTINEL'


def update_data(child_conn):

    result = []

    for msg in iter(child_conn.recv, SENTINEL):
        print(f'{datetime.now()} child received {msg}')
        result.append(msg)

    print(f'{datetime.now()} child received sentinel')
    result.append(['new data'])
    writer(child_conn, result)
    child_conn.close()


def writer(conn, data):
    conn.send(data)
    conn.send(SENTINEL)


if __name__=='__main__':

    parent_conn, child_conn = Pipe()  # default is duplex!
    update_data_process = Process(target=update_data, args=(child_conn,))
    update_data_process.start()

    data = ['0' for i in range(3)]
    writer(parent_conn, data)

    for msg in iter(parent_conn.recv, SENTINEL):
        print(f'{datetime.now()} parent received {msg}')

    print(f'{datetime.now()} parent received sentinel')
    parent_conn.close()
    update_data_process.join()

Пример вывода:

2019-03-12 00:09:52.920375 child received ['0', '0', '0']
2019-03-12 00:09:52.920512 child received sentinel
2019-03-12 00:09:52.920702 parent received [['0', '0', '0'], ['new data']]
2019-03-12 00:09:52.920764 parent received sentinel

Process finished with exit code 0

Если вы не знакомы с использованием iter(callable, sentinel), посмотрите здесь .

...