Многопроцессорная обработка Python зависает при пропускании большого массива через канал - PullRequest
0 голосов
/ 04 января 2019

Я использую многопроцессорность в python и пытаюсь передать большой массив с пустым массивом в подпроцесс через канал. Он хорошо работает с маленьким массивом, но зависает для больших массивов без возврата ошибки.

Я считаю, что канал заблокирован, и уже немного прочитал об этом, но не могу понять, как решить проблему.

def f2(conn, x):
    conn.start()
    data = conn.recv()
    conn.join()

    print(data)
    do_something(x)

    conn.close()

if __name__ == '__main__':
    data_input = read_data()    # large numpy array
    parent_conn, child_conn = Pipe()

    p = multiprocessing.Pool(processes=8)      
    func = partial(f2, child_conn)

    parent_conn.send(data_input)
    parent_conn.close()

    result = p.map(func, processes)

    p.close()
    p.join()

1 Ответ

0 голосов
/ 04 января 2019

Игнорирование всех других проблем в этом коде (у вас нет x для передачи на map, вы не используете x f2, смешивая Pool.map с Pipe обычно это неправильно), ваша главная проблема - блокировка send вызова, выполняемого до рабочего процесса, доступного для чтения из него.

Предполагая, что вы действительно хотите смешать map с Pipe, решение состоит в том, чтобы запустить map асинхронно до начала send, так что с другой стороны есть что прочитать из Pipe пока родитель пытается написать в него:

if __name__ == '__main__':
    data_input = read_data()    # large numpy array
    parent_conn, child_conn = Pipe()

    # Use with to avoid needing to explicitly close/join
    with multiprocessing.Pool(processes=8) as p:
        func = partial(f2, child_conn)

        # Launch async map to ensure workers are running
        future = p.map_async(func, x)

        # Can perform blocking send as workers will consume as you send
        parent_conn.send(data_input)
        parent_conn.close()

        # Now you can wait on the map to complete
        result = future.get()

Как отмечалось, этот код не будет запускаться из-за проблем с x, и даже если это произойдет, документация Pipe явно предупреждает, что два разных процесса не должны читать из Pipe одновременно.

Если вы хотите массово обработать данные в одном рабочем месте, вы просто используете Process и Pipe, что-то вроде:

def f2(conn):
    data = conn.recv()
    conn.close()
    print(data)

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()

    proc = multiprocessing.Process(target=f2, args=(child_conn,))
    proc.start()

    data_input = read_data()    # large numpy array
    parent_conn.send(data_input)
    parent_conn.close()

    proc.join()

Если вы хотите обрабатывать каждый элемент отдельно для множества рабочих, вы просто используете Pool и map:

def f2(x):
    print(x)

if __name__ == '__main__':
    data_input = read_data()    # large numpy array
    with multiprocessing.Pool(processes=8) as p:   
        result = p.map(f2, data_input)
...