Довольно простой многопроцессорный пример. Цели:
- Создайте пул рабочих процессов, используя
mp.Pool
- Выполнить какое-то преобразование (здесь простая строковая операция на
line
)
- Нажмите преобразованную линию на
mp.Queue
- Дальнейшая обработка данных этого
mp.Queue
в основной программе впоследствии
Итак, давайте сделаем это:
import multiprocessing as mp
Инициировать асинхронные процессы с mp.queue
def process_pool_init_per_process(q):
global mp_queue
mp_queue = q
Действительно инициализировать mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))
Это вызывается для каждого line
, который будет обрабатываться асинхронно
def process_async_main(line):
print(line)
q.put(line + '_asynced')
А теперь давайте начнем с apply_async
line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()
и читать из очереди
while not q.empty():
print(q.get()) # This should be the inital line
Сбой с:
python3 mp_process_example.py
Process ForkPoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process_async_main' on <module '__main__' from 'mp_process_example.py'>
Вопрос: Почему многопроцессорная система не находит основной класс?
Полный код для воспроизведения:
import multiprocessing as mp
##### Init async processes
def process_pool_init_per_process(q):
global mp_queue
mp_queue = q
# Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))
#This is getting called for every line to be proccesed async
def process_async_main(line):
print(line)
q.put(line + '_asynced')
line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()
while not q.empty():
print(q.get()) # This should be the inital line