Пул многопроцессорных процессов Python не может найти асинхронную функцию - PullRequest
0 голосов
/ 28 июня 2019

Довольно простой многопроцессорный пример. Цели:

  1. Создайте пул рабочих процессов, используя mp.Pool
  2. Выполнить какое-то преобразование (здесь простая строковая операция на line)
  3. Нажмите преобразованную линию на mp.Queue
  4. Дальнейшая обработка данных этого mp.Queue в основной программе впоследствии

Итак, давайте сделаем это:

import multiprocessing as mp

Инициировать асинхронные процессы с mp.queue

def process_pool_init_per_process(q):
    global mp_queue
    mp_queue = q

Действительно инициализировать mp_pool

no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))

Это вызывается для каждого line, который будет обрабатываться асинхронно

def process_async_main(line):
    print(line)
    q.put(line + '_asynced')

А теперь давайте начнем с apply_async

line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()

и читать из очереди

while not q.empty():
    print(q.get()) # This should be the inital line

Сбой с:

python3 mp_process_example.py
Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process_async_main' on <module '__main__' from 'mp_process_example.py'>

Вопрос: Почему многопроцессорная система не находит основной класс?

Полный код для воспроизведения:

import multiprocessing as mp

##### Init async processes
def process_pool_init_per_process(q):
    global mp_queue
    mp_queue = q

# Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))

#This is getting called for every line to be proccesed async
def process_async_main(line):
    print(line)
    q.put(line + '_asynced')

line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()

while not q.empty():
    print(q.get()) # This should be the inital line

1 Ответ

0 голосов
/ 28 июня 2019

Хорошо ... Я понял ... По какой-то странной причине multiprocessing не может выполнить асинхронную функцию в том же файле, что и синхронизированный код .

Написание кода следующим образом:

asynced.py

##### Init async processes
def process_pool_init_per_process(q):
    global mp_queue
    mp_queue = q

##### Function to be asycned
def process_async_main(line):
    print(line)
    mp_queue.put(line + '_asynced')

И чем mp_process_example.py:

import multiprocessing as mp
from asynced import process_async_main, process_pool_init_per_process


# Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))

line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line,))
mp_resp = handler.get()

while not q.empty():
    print(q.get()) # This should be the inital line + "_asynced"

Работает как положено:

$ python3 mp_process_example.py
Hi, this is a test to test mp_queues with mp process pools
Hi, this is a test to test mp_queues with mp process pools_asynced
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...