многопроцессорный пул и генераторы - PullRequest
0 голосов
/ 27 июня 2018

Сначала посмотрите на следующий код:

pool = multiprocessing.Pool(processes=N)
batch = []
for item in generator():
    batch.append(item)
    if len(batch) == 10:
        pool.apply_async(my_fun, args=(batch,))
        batch = []
# leftovers
pool.apply_async(my_fun, args=(batch,))

По сути, я извлекаю данные из генератора, собираю их в список и затем вызываю процесс, который потребляет пакет данных.

Это может выглядеть нормально, но когда потребители (так называемые процессы пула) медленнее, чем производитель (так называемый генератор), использование памяти основным процессом увеличивается до тех пор, пока генератор не остановится или ... системе не хватит памяти.

Как мне избежать этой проблемы?

Ответы [ 2 ]

0 голосов
/ 28 июня 2018

Используйте рецепт grouper itertools для порции данных из вашего генератора.

Использование инфраструктуры в одновременных фьючерсах для обработки отправки и извлечения задач вместе с процессами.

Вы могли бы

  • отправить группу задач; ждать, пока они закончат; затем отправьте другую группу или
  • поддерживать конвейер заполненным, отправляя новое задание каждый раз при завершении.

Настройка (попытка смоделировать ваш процесс):

import concurrent.futures
import itertools, time, collections, random
from pprint import pprint

# from itertools recipes
def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fillvalue)

# generator/iterator facsimile
class G:
    '''Long-winded range(n)'''
    def __init__(self, n=108):
        self.n = n
        self.a = []
    def __iter__(self):
        return self
    def __next__(self):
        #self.a.append(time.perf_counter())
        if self.n < 0:
            raise StopIteration
        x = self.n
        self.n -= 1
        return x

def my_func(*args):
    time.sleep(random.randint(1,10))
    return sum(*args)

Дождаться завершения групп задач

if __name__ == '__main__':
    nworkers = 4
    g = G()
    # generate data three-at-a-time
    data = grouper(g, 3, 0)
    results = []
    fs = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
        for args in data:
            print(f'pending:{len(executor._pending_work_items)}')
            # block submission - limit pending tasks to conserve resources (memory) 
            if len(executor._pending_work_items) == nworkers:
                # wait till all complete and get the results
                futures = concurrent.futures.wait(fs, return_when=concurrent.futures.ALL_COMPLETED)
                #print(futures)
                results.extend(future.result() for future in futures.done)
                fs = list(futures.not_done)
            # add a new task
            fs.append(executor.submit(my_func, args))
        # data exhausted - get leftover results as they finish
        for future in concurrent.futures.as_completed(fs):
            print(f'pending:{len(executor._pending_work_items)}')
            result = future.result()
            results.append(result)

    pprint(results)

Поддерживать пул процессов полный .

if __name__ == '__main__':
    nworkers = 4
    g = G()
    # generate data three-at-a-time
    data = grouper(g, 3, 0)
    results = []
    fs = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
        for args in data:
            print(f'pending:{len(executor._pending_work_items)}')
            # block submission - limit pending tasks to conserve resources (memory) 
            if len(executor._pending_work_items) == nworkers:
                # wait till one completes and get the result
                futures = concurrent.futures.wait(fs, return_when=concurrent.futures.FIRST_COMPLETED)
                #print(futures)
                results.extend(future.result() for future in futures.done)
                fs = list(futures.not_done)
            # add a new task
            fs.append(executor.submit(my_func, args))
        # data exhausted - get leftover results as they finish
        for future in concurrent.futures.as_completed(fs):
            print(f'pending:{len(executor._pending_work_items)}')
            result = future.result()
            results.append(result)

    pprint(results)
0 голосов
/ 27 июня 2018

В этом случае вы можете использовать очередь ограниченного размера.

q = multiprocessing.Queue(maxSize).

При использовании с макс. size, это обеспечит вам необходимый подсчет и заблокирует поток, который вызывает q.put (), когда он заполнен, так что вы никогда не сможете разместить на нем больше определенного количества рабочих элементов и тем самым ограничить память, необходимую для хранения ожидающие решения пункты.

В качестве альтернативы вы можете использовать счетный семафор (например, multiprocessing.BoundedSemaphore (maxSize)). Получайте его каждый раз, когда вы получаете рабочий элемент из генератора, и запускайте его в своей рабочей функции (my_fun) после обработки элемента. Таким образом, максимальное количество рабочих элементов, ожидающих обработки, никогда не превысит начальное значение семафора.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...