python pool apply_async и map_async не блокируются в полной очереди - PullRequest
10 голосов
/ 07 марта 2012

Я довольно новичок в питоне. Я использую модуль многопроцессорной обработки для чтения строк текста в stdin, преобразования их некоторым образом и записи в базу данных. Вот фрагмент моего кода:

batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        pool.apply_async(insert, args=(batch,i+1))
        batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()

Теперь, когда все работает нормально, пока я не обработаю огромные входные файлы (сотни миллионов строк), которые я направляю в свою программу на python. В какой-то момент, когда моя база данных работает медленнее, я вижу, что память переполняется.

После некоторой игры выяснилось, что pool.apply_async и pool.map_async никогда не блокируются, поэтому очередь обрабатываемых вызовов увеличивается и увеличивается.

Какой правильный подход к моей проблеме? Я ожидаю, что параметр, который я могу установить, будет блокировать вызов pool.apply_async, как только будет достигнута определенная длина очереди. AFAIR в Java, можно предоставить ThreadPoolExecutor BlockingQueue с фиксированной длиной для этой цели.

Спасибо!

Ответы [ 4 ]

11 голосов
/ 08 марта 2012

На всякий случай, если кто-то здесь окажется, вот как я решил проблему: я перестал использовать многопроцессорную работу. Вот как я это делаю сейчас:

#set amount of concurrent processes that insert db data
processes = multiprocessing.cpu_count() * 2

#setup batch queue
queue = multiprocessing.Queue(processes * 2)

#start processes
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() 

#fill queue with batches    
batch=[]
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        queue.put((batch,i+1))
        batch = []
if batch:
    queue.put((batch,i+1))

#stop processes using poison-pill
for _ in range(processes): queue.put((None,None))

print "all done."

в методе вставки обработка каждой партии заключена в цикл, который вытягивается из очереди, пока не получит отравленную таблетку:

while True:
    batch, end = queue.get()
    if not batch and not end: return #poison pill! complete!
    [process the batch]
print 'worker done.'
9 голосов
/ 09 сентября 2013

Функции apply_async и map_async предназначены не для блокировки основного процесса. Для этого Pool поддерживает внутренний Queue, размер которого, к сожалению, невозможно изменить.

Способ решения проблемы - использование Semaphore, инициализированного с размером, который вы хотите, чтобы очередь была. Вы приобретаете и освобождаете семафор до заполнения пула и после того, как работник завершил задачу.

Вот пример работы с Python 2.6 или выше.

from threading import Semaphore
from multiprocessing import Pool

def task_wrapper(f):
    """Python2 does not allow a callback for method raising exceptions,
    this wrapper ensures the code run into the worker will be exception free.

    """
    try:
        return f()
    except:
        return None

class TaskManager(object):
    def __init__(self, processes, queue_size):
        self.pool = Pool(processes=processes)
        self.workers = Semaphore(processes + queue_size)

    def new_task(self, f):
        """Start a new task, blocks if queue is full."""
        self.workers.acquire()
        self.pool.apply_async(task_wrapper, args=(f, ), callback=self.task_done))

    def task_done(self):
        """Called once task is done, releases the queue is blocked."""
        self.workers.release()

Другой пример с использованием реализации concurrent.futures пулов.

2 голосов
/ 07 марта 2012

apply_async возвращает объект AsyncResult, который можно wait включить:

if len(batch) >= 10000:
    r = pool.apply_async(insert, args=(batch, i+1))
    r.wait()
    batch = []

Хотя, если вы хотите сделать это более чистым способом, вам следуетиспользуйте multiprocessing.Queue с maxsize 10000 и извлекайте класс Worker из multiprocessing.Process, который выбирает из такой очереди.

1 голос
/ 22 сентября 2018

Не красиво, но вы можете получить доступ к внутреннему размеру очереди и подождать, пока она не станет ниже вашего максимального желаемого размера, прежде чем добавлять новые элементы:

max_pool_queue_size = 20

for i in range(10000):
  pool.apply_async(some_func, args=(...))

  while pool._taskqueue.qsize() > max_pool_queue_size:
    time.sleep(1)
...