python многопроцессорный пул - PullRequest
0 голосов
/ 30 апреля 2020

При использовании многопроцессорного пула python сколько заданий отправлено?

Как это решено? Можем ли мы как-то это контролировать? Как минимум 10 заданий в очереди, чтобы уменьшить использование памяти.

Предположим, у меня есть магистральный код, написанный ниже: Для каждого хрома и симуляции я читаю данные как pandas dataframe.

(я думал, что чтение данных перед отправкой задания было бы лучше, чтобы уменьшить количество операций ввода-вывода, связанных с рабочим процессом)

Затем я отправляю фрейм данных pandas каждому работнику для его обработки.

Но похоже, что отправлено больше заданий, чем завершено заданий, и это приводит к ошибке памяти.

numofProcesses = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=numofProcesses)
jobs=[]


all_result1={}
all_result2={}

def accumulate(result):
 result1=result[0]
 result2=result[1]
 accumulate(resulst1,all_result1)
 accumulate(resulst2,all_result2)
 print('ACCUMULATE')

for each chr:
 for each sim:
     chrBased_simBased_df= readData(chr,sim)
     jobs.append(pool.apply_async(func, args=(chrBased_simBased_df,too,many,),callback=accumulate))
     print('Submitted job:%d' %(len(jobs)))

pool.close()
pool.join()

Есть ли способ избавиться от нее?

1 Ответ

2 голосов
/ 30 апреля 2020

И multiprocessing.Pool, и concurrent.futures.ProcessPoolExecutor не позволяют ограничивать количество задач, которые вы отправляете работникам.

Тем не менее, это очень тривиальное расширение, которое вы можете создать самостоятельно, используя семафор.

Вы можете проверить пример в этом gist . Он использует модуль concurrent.futures, но портировать его на multiprocessing.Pool тоже не составит труда.

from threading import BoundedSemaphore
from concurrent.futures import ProcessPoolExecutor


class MaxQueuePool:
    """This Class wraps a concurrent.futures.Executor
    limiting the size of its task queue.
    If `max_queue_size` tasks are submitted, the next call to submit will block
    until a previously submitted one is completed.
    """
    def __init__(self, executor, max_queue_size, max_workers=None):
        self.pool = executor(max_workers=max_workers)
        self.pool_queue = BoundedSemaphore(max_queue_size)

    def submit(self, function, *args, **kwargs):
        """Submits a new task to the pool, blocks if Pool queue is full."""
        self.pool_queue.acquire()

        future = self.pool.submit(function, *args, **kwargs)
        future.add_done_callback(self.pool_queue_callback)

        return future

    def pool_queue_callback(self, _):
        """Called once task is done, releases one queue slot."""
        self.pool_queue.release()


if __name__ == '__main__':
    pool = MaxQueuePool(ProcessPoolExecutor, 8)
    f = pool.submit(print, "Hello World!")
    f.result()
...