Игнорирование всех других проблем в этом коде (у вас нет x
для передачи на map
, вы не используете x
f2
, смешивая Pool.map
с Pipe
обычно это неправильно), ваша главная проблема - блокировка send
вызова, выполняемого до рабочего процесса, доступного для чтения из него.
Предполагая, что вы действительно хотите смешать map
с Pipe
, решение состоит в том, чтобы запустить map
асинхронно до начала send
, так что с другой стороны есть что прочитать из Pipe
пока родитель пытается написать в него:
if __name__ == '__main__':
data_input = read_data() # large numpy array
parent_conn, child_conn = Pipe()
# Use with to avoid needing to explicitly close/join
with multiprocessing.Pool(processes=8) as p:
func = partial(f2, child_conn)
# Launch async map to ensure workers are running
future = p.map_async(func, x)
# Can perform blocking send as workers will consume as you send
parent_conn.send(data_input)
parent_conn.close()
# Now you can wait on the map to complete
result = future.get()
Как отмечалось, этот код не будет запускаться из-за проблем с x
, и даже если это произойдет, документация Pipe
явно предупреждает, что два разных процесса не должны читать из Pipe
одновременно.
Если вы хотите массово обработать данные в одном рабочем месте, вы просто используете Process
и Pipe
, что-то вроде:
def f2(conn):
data = conn.recv()
conn.close()
print(data)
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
proc = multiprocessing.Process(target=f2, args=(child_conn,))
proc.start()
data_input = read_data() # large numpy array
parent_conn.send(data_input)
parent_conn.close()
proc.join()
Если вы хотите обрабатывать каждый элемент отдельно для множества рабочих, вы просто используете Pool
и map
:
def f2(x):
print(x)
if __name__ == '__main__':
data_input = read_data() # large numpy array
with multiprocessing.Pool(processes=8) as p:
result = p.map(f2, data_input)