Мой сценарий использования мультипроцессора включает несколько процессов, добавляющих данные в четыре очереди, и один процесс на очередь, работающий с этими данными.Поскольку мне нужно, чтобы несколько процессов добавлялись и удалялись из каждой очереди, похоже, что multiprocessing.Manager().Queue()
был подходящим инструментом для работы.
Однако, когда я порождаю свои рабочие процессы, я получаю can't pickle _thread.rlock objects
, когдавыполнение mp.Process(target=my_method, args=(a,queue,)).start()
.
Вот в основном то, что я делаю:
import multiprocessing as mp
def create_managers():
destination_managers = {}
for k in desired_queues:
destination_managers[k] = {'man': mp.Manager()}
destination_managers[k]['q'] = destination_managers[k]['man'].Queue()
return destination_managers
def add_to_queue(queue, source):
data = get_item_from_source(source)
queue.put(data)
def getn(q,n):
result = [q.get()]
try:
while len(result) < n:
result.append(q.get(block=False))
except managers.queue.Empty:
pass
return result
def do_work(queue):
while True:
try:
next_chunk = getn(queue,1000) # Get next 1,000 items from queue.
result = []
for i in next_chunk:
# Operate on the data
processed_data = process_the_data(next_chunk[i])
result.append(processed_data)
try:
# Upload the processed chunk to a database
insert_to_db(result)
except:
# Sometimes the uploads fail due to unreliable network. Put the data back in the queue so we try again later.
for i in next_chunk:
queue.put(next_chunk[row])
except queue.empty:
pass
def spawn_putters(dest_mgrs, source_list):
putters = []
for k in source_list:
proc = mp.Process(target=add_to_queue, args=(dest_mgrs[source_dest_map[k]]['q'],source_list[k],)
putters.append(proc)
proc.start()
return putters
def spawn_workers(dest_mgrs):
workers = []
for k in dest_mgrs:
proc = mp.Process(target=do_work,args=(dest_mgrs[k]['q'],)
workers.append(proc)
proc.start()
return workers
if __name__ == "__main__":
source_list #is defined
destination_managers = create_managers()
putters = spawn_putters(destination_managers, source_list)
workers = spawn_workers(destination_managers)
Как мне убедиться, что дочерние процессы могут работать в очереди?