Manager (). Queue () не может быть рассмотрен - PullRequest
0 голосов
/ 13 ноября 2018

Мой сценарий использования мультипроцессора включает несколько процессов, добавляющих данные в четыре очереди, и один процесс на очередь, работающий с этими данными.Поскольку мне нужно, чтобы несколько процессов добавлялись и удалялись из каждой очереди, похоже, что multiprocessing.Manager().Queue() был подходящим инструментом для работы.

Однако, когда я порождаю свои рабочие процессы, я получаю can't pickle _thread.rlock objects, когдавыполнение mp.Process(target=my_method, args=(a,queue,)).start().

Вот в основном то, что я делаю:

import multiprocessing as mp

def create_managers():
    destination_managers = {}
    for k in desired_queues:
        destination_managers[k] = {'man': mp.Manager()}
        destination_managers[k]['q'] = destination_managers[k]['man'].Queue()
    return destination_managers

def add_to_queue(queue, source):
    data = get_item_from_source(source)
    queue.put(data)

def getn(q,n):
    result = [q.get()]
    try:
        while len(result) < n:
            result.append(q.get(block=False))
    except managers.queue.Empty:
        pass
    return result

def do_work(queue):
    while True:
        try:
            next_chunk = getn(queue,1000) # Get next 1,000 items from queue.
            result = []
            for i in next_chunk:
                # Operate on the data
                processed_data = process_the_data(next_chunk[i])
                result.append(processed_data)
            try:
                # Upload the processed chunk to a database
                insert_to_db(result)
            except:
                # Sometimes the uploads fail due to unreliable network. Put the data back in the queue so we try again later.
                for i in next_chunk:
                    queue.put(next_chunk[row])
        except queue.empty:
            pass

def spawn_putters(dest_mgrs, source_list):
    putters = []
    for k in source_list:
        proc = mp.Process(target=add_to_queue, args=(dest_mgrs[source_dest_map[k]]['q'],source_list[k],)
        putters.append(proc)
        proc.start()
    return putters

def spawn_workers(dest_mgrs):
    workers = []
    for k in dest_mgrs:
        proc = mp.Process(target=do_work,args=(dest_mgrs[k]['q'],)
        workers.append(proc)
        proc.start()
    return workers

if __name__ == "__main__":
    source_list #is defined
    destination_managers = create_managers()
    putters = spawn_putters(destination_managers, source_list)
    workers = spawn_workers(destination_managers)

Как мне убедиться, что дочерние процессы могут работать в очереди?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...