Память не освобождена после завершения многопроцессорного пула Python - PullRequest
7 голосов
/ 07 января 2020

При использовании многопроцессорной обработки Python Pool.map() я не возвращаю свою память. Более 1 ГБ памяти все еще занято, хотя функция с Pool закрыта, все закрыто, и я даже пытаюсь удалить переменную Pool и явно вызвать сборщик мусора.

Когда в коде, показанном ниже, без комментирования двух строк над pool.map() (и комментирования строки pool.map()) все выглядит нормально, но как только при использовании multiprocessing память, кажется, не освобождается снова после выхода из function.

Поскольку в коде реального мира вызываются несколько других функций, использующих multiprocessing, это затем даже складывается, занимая всю память. (К сожалению, я не могу привести минимальный пример для второстепенного второго случая с накоплением памяти, но как только основная проблема будет решена, эта вторая тоже должна исчезнуть.)

Это Python 3.7.3 на Linux и любая помощь по крайней мере , объясняющая или даже решение этой проблемы, очень приветствуется.

Минимальный пример кода:

import gc
from time import sleep
from memory_profiler import profile
import numpy as np

def waitat(where, t):
    # print and wait, gives chance to see live memory usage in some task manager program
    print(where)
    sleep(t)

@profile
def parallel_convert_all_to_hsv(imgs: np.ndarray) -> np.ndarray:
    from skimage.color import rgb2hsv
    import multiprocessing as mp
    print("going parallel")
    pool = mp.Pool()
    try:
        # images_converted = [] # there is no memory problem when using commented lines below, instead of pool.map(…) line
        # for img in imgs:
        #     images_converted.append(rgb2hsv(img))
        images_converted = pool.map(rgb2hsv, imgs)
    except KeyboardInterrupt:
        pool.terminate()
    waitat("after pool.map",5)

    pool.close()
    pool.join()

    waitat("before del pool",5)
    pool = None
    del pool    # memory should now be freed here?
    mp = None
    rgb2hsv = None

    waitat("after del pool",5)
    print("copying over")
    res = np.array(images_converted)
    waitat("before del image_hsv in function",5)
    images_converted = None
    del images_converted
    return res

@profile
def doit():
    print("create random images")
    max_images = 700
    images = np.random.rand(max_images, 300, 300,3)

    waitat("before going parallel",5)
    images_converted = parallel_convert_all_to_hsv(images)
    print("images_converted has %i bytes" % images_converted.nbytes)
    # how to clean up Pool's memory at latest here?

    waitat("before deleting original images",5)
    images = None
    del images
    waitat("memory should be as before going parallel + %i bytes" % images_converted.nbytes ,10)
    images_converted = None
    del images_converted
    waitat("nearly end, memory should be as before" ,15)
    gc.collect(2)
    waitat("end, memory should be as before" ,15)    

doit()

Вывод с использованием Memory Profiler , показывающий проблему:

$ python3 -m memory_profiler pool-mem-probs.py
create random images
before going parallel
going parallel
after pool.map
before del pool
after del pool
copying over
before del image_hsv in function
Filename: pool-mem-probs.py

Line #    Mem usage    Increment   Line Contents
================================================
    11   1481.2 MiB   1481.2 MiB   @profile
    12                             def parallel_convert_all_to_hsv(imgs: np.ndarray) -> np.ndarray:
    13   1487.2 MiB      6.0 MiB       from skimage.color import rgb2hsv
    14   1487.2 MiB      0.0 MiB       import multiprocessing as mp
    15   1487.2 MiB      0.0 MiB       print("going parallel")
    16   1488.6 MiB      1.4 MiB       pool = mp.Pool()
    17   1488.6 MiB      0.0 MiB       try:
    18                                     # images_converted = []  # there is no memory problem when using commented lines below, instead of pool.map(…) line
    19                                     # for img in imgs:
    20                                     #     images_converted.append(rgb2hsv(img))
    21   2930.9 MiB   1442.3 MiB           images_converted = pool.map(rgb2hsv, imgs)
    22                                 except KeyboardInterrupt:
    23                                     pool.terminate()
    24   2930.9 MiB      0.0 MiB       waitat("after pool.map",5)
    25                                 
    26   2930.9 MiB      0.0 MiB       pool.close()
    27   2931.0 MiB      0.1 MiB       pool.join()
    28                                 
    29   2931.0 MiB      0.0 MiB       waitat("before del pool",5)
    30   2931.0 MiB      0.0 MiB       pool = None
    31   2931.0 MiB      0.0 MiB       del pool    # memory should now be freed here?
    32   2931.0 MiB      0.0 MiB       mp = None
    33   2931.0 MiB      0.0 MiB       rgb2hsv = None
    34                                 
    35   2931.0 MiB      0.0 MiB       waitat("after del pool",5)
    36   2931.0 MiB      0.0 MiB       print("copying over")
    37   4373.0 MiB   1441.9 MiB       res = np.array(images_converted)
    38   4373.0 MiB      0.0 MiB       waitat("before del image_hsv in function",5)
    39   4016.6 MiB      0.0 MiB       images_converted = None
    40   4016.6 MiB      0.0 MiB       del images_converted
    41   4016.6 MiB      0.0 MiB       return res


images_converted has 1512000000 bytes
before deleting original images
memory should be as before going parallel + 1512000000 bytes
nearly end, memory should be as before
end, memory should be as before
Filename: pool-mem-probs.py

Line #    Mem usage    Increment   Line Contents
================================================
    43     39.1 MiB     39.1 MiB   @profile
    44                             def doit():
    45     39.1 MiB      0.0 MiB       print("create random images")
    46     39.1 MiB      0.0 MiB       max_images = 700
    47   1481.2 MiB   1442.1 MiB       images = np.random.rand(max_images, 300, 300,3)
    48                             
    49   1481.2 MiB      0.0 MiB       waitat("before going parallel",5)
    50   4016.6 MiB   2535.4 MiB       images_converted = parallel_convert_all_to_hsv(images)
    51   4016.6 MiB      0.0 MiB       print("images_converted has %i bytes" % images_converted.nbytes)
    52                                 # how to clean up Pool's memory at latest here?
    53                             
    54   4016.6 MiB      0.0 MiB       waitat("before deleting original images",5)
    55   2574.6 MiB      0.0 MiB       images = None
    56   2574.6 MiB      0.0 MiB       del images
    57   2574.6 MiB      0.0 MiB       waitat("memory should be as before going parallel + %i bytes" % images_converted.nbytes ,10)
    58   1132.7 MiB      0.0 MiB       images_converted = None
    59   1132.7 MiB      0.0 MiB       del images_converted
    60   1132.7 MiB      0.0 MiB       waitat("nearly end, memory should be as before" ,15)
    61   1132.7 MiB      0.0 MiB       gc.collect(2)
    62   1132.7 MiB      0.0 MiB       waitat("end, memory should be as before" ,15)    

Вывод непараллельного кода (там, где проблема не возникает):

$ python3 -m memory_profiler pool-mem-probs.py
create random images
before going parallel
going parallel
after pool.map
before del pool
after del pool
copying over
before del image_hsv in function
Filename: pool-mem-probs.py

Line #    Mem usage    Increment   Line Contents
================================================
    11   1481.3 MiB   1481.3 MiB   @profile
    12                             def parallel_convert_all_to_hsv(imgs: np.ndarray) -> np.ndarray:
    13   1488.1 MiB      6.8 MiB       from skimage.color import rgb2hsv
    14   1488.1 MiB      0.0 MiB       import multiprocessing as mp
    15   1488.1 MiB      0.0 MiB       print("going parallel")
    16   1488.7 MiB      0.6 MiB       pool = mp.Pool()
    17   1488.7 MiB      0.0 MiB       try:
    18   1488.7 MiB      0.0 MiB           images_converted = []    # there is no memory problem when using commented lines below, instead of pool.map(…) line
    19   2932.6 MiB      0.0 MiB           for img in imgs:
    20   2932.6 MiB      2.2 MiB               images_converted.append(rgb2hsv(img))
    21                                     # images_converted = pool.map(rgb2hsv, imgs)
    22                                 except KeyboardInterrupt:
    23                                     pool.terminate()
    24   2932.6 MiB      0.0 MiB       waitat("after pool.map",5)
    25                                 
    26   2932.6 MiB      0.0 MiB       pool.close()
    27   2932.8 MiB      0.2 MiB       pool.join()
    28                                 
    29   2932.8 MiB      0.0 MiB       waitat("before del pool",5)
    30   2932.8 MiB      0.0 MiB       pool = None
    31   2932.8 MiB      0.0 MiB       del pool    # memory should now be freed here?
    32   2932.8 MiB      0.0 MiB       mp = None
    33   2932.8 MiB      0.0 MiB       rgb2hsv = None
    34                                 
    35   2932.8 MiB      0.0 MiB       waitat("after del pool",5)
    36   2932.8 MiB      0.0 MiB       print("copying over")
    37   4373.3 MiB   1440.5 MiB       res = np.array(images_converted)
    38   4373.3 MiB      0.0 MiB       waitat("before del image_hsv in function",5)
    39   2929.6 MiB      0.0 MiB       images_converted = None
    40   2929.6 MiB      0.0 MiB       del images_converted
    41   2929.6 MiB      0.0 MiB       return res


images_converted has 1512000000 bytes
before deleting original images
memory should be as before going parallel + 1512000000 bytes
nearly end, memory should be as before
end, memory should be as before
Filename: pool-mem-probs.py

Line #    Mem usage    Increment   Line Contents
================================================
    43     39.2 MiB     39.2 MiB   @profile
    44                             def doit():
    45     39.2 MiB      0.0 MiB       print("create random images")
    46     39.2 MiB      0.0 MiB       max_images = 700
    47   1481.3 MiB   1442.1 MiB       images = np.random.rand(max_images, 300, 300,3)
    48                             
    49   1481.3 MiB      0.0 MiB       waitat("before going parallel",5)
    50   2929.6 MiB   1448.3 MiB       images_converted = parallel_convert_all_to_hsv(images)
    51   2929.6 MiB      0.0 MiB       print("images_converted has %i bytes" % images_converted.nbytes)
    52                                 # how to clean up Pool's memory at latest here?
    53                             
    54   2929.6 MiB      0.0 MiB       waitat("before deleting original images",5)
    55   1487.7 MiB      0.0 MiB       images = None
    56   1487.7 MiB      0.0 MiB       del images
    57   1487.7 MiB      0.0 MiB       waitat("memory should be as before going parallel + %i bytes" % images_converted.nbytes ,10)
    58     45.7 MiB      0.0 MiB       images_converted = None
    59     45.7 MiB      0.0 MiB       del images_converted
    60     45.7 MiB      0.0 MiB       waitat("nearly end, memory should be as before" ,15)
    61     45.7 MiB      0.0 MiB       gc.collect(2)
    62     45.7 MiB      0.0 MiB       waitat("end, memory should be as before" ,15)    

Ответы [ 3 ]

2 голосов
/ 17 января 2020

Порог генерации может мешать, посмотрите на g c .get_threshold ()

попробуйте включить

gc.disable()
0 голосов
/ 23 января 2020

Поскольку multithreading.Pool не может освободить память размером около 1 * Гб, я также попытался заменить ее на ThreadPool, но не лучше. Я все еще задаюсь вопросом об этой проблеме утечки памяти в пулах.

Это может быть не лучшим решением, но может быть решением для обхода.

Не используя ThreadPool или ProcessPool, я создаю потоки или процессы вручную и назначаю каждому изображение для преобразования в HSV. Ну, я прокомментировал строку p = multiprocessing.Process(target=do_hsv, args=(imgs[j], shared_list)), потому что она будет порождать новый процесс для каждого преобразования изображения, который, я думаю, будет излишним и намного дороже, чем Threads. Очевидно, что создание потоков вручную займет больше времени (9 се c без утечки памяти ), чем ThreadPool (4 се c, но с утечкой памяти ), но как вы Я могу видеть, что он почти спокоен в памяти.

Вот мой код:

import multiprocessing
import os
import threading
import time
from memory_profiler import profile
import numpy as np
from skimage.color import rgb2hsv


def do_hsv(img, shared_list):
    shared_list.append(rgb2hsv(img))
    # print("Converted by process {} having parent process {}".format(os.getpid(), os.getppid()))


@profile
def parallel_convert_all_to_hsv(imgs, shared_list):

    cores = os.cpu_count()

    starttime = time.time()

    for i in range(0, len(imgs), cores):

        # print("i :", i)

        jobs = []; pipes = []

        end = i + cores if (i + cores) <= len(imgs) else i + len(imgs[i : -1]) + 1

        # print("end :", end)

        for j in range(i, end):
            # print("j :", j)

            # p = multiprocessing.Process(target=do_hsv, args=(imgs[j], shared_list))
            p = threading.Thread(target= do_hsv, args=(imgs[j], shared_list))

            jobs.append(p)

        for p in jobs: p.start()

        for proc in jobs:
            proc.join()

    print("Took {} seconds to complete ".format(starttime - time.time()))
    return 1

@profile
def doit():

    print("create random images")

    max_images = 700

    images = np.random.rand(max_images, 300, 300,3)

    # images = [x for x in range(0, 10000)]
    manager = multiprocessing.Manager()
    shared_list = manager.list()

    parallel_convert_all_to_hsv(images, shared_list)

    del images

    del shared_list

    print()


doit()

Вот вывод:

create random images
Took -9.085552453994751 seconds to complete 
Filename: MemoryNotFreed.py

Line #    Mem usage    Increment   Line Contents
================================================
    15   1549.1 MiB   1549.1 MiB   @profile
    16                             def parallel_convert_all_to_hsv(imgs, shared_list):
    17                             
    18   1549.1 MiB      0.0 MiB       cores = os.cpu_count()
    19                             
    20   1549.1 MiB      0.0 MiB       starttime = time.time()
    21                             
    22   1566.4 MiB      0.0 MiB       for i in range(0, len(imgs), cores):
    23                             
    24                                     # print("i :", i)
    25                             
    26   1566.4 MiB      0.0 MiB           jobs = []; pipes = []
    27                             
    28   1566.4 MiB      0.0 MiB           end = i + cores if (i + cores) <= len(imgs) else i + len(imgs[i : -1]) + 1
    29                             
    30                                     # print("end :", end)
    31                             
    32   1566.4 MiB      0.0 MiB           for j in range(i, end):
    33                                         # print("j :", j)
    34                             
    35                                         # p = multiprocessing.Process(target=do_hsv, args=(imgs[j], shared_list))
    36   1566.4 MiB      0.0 MiB               p = threading.Thread(target= do_hsv, args=(imgs[j], shared_list))
    37                             
    38   1566.4 MiB      0.0 MiB               jobs.append(p)
    39                             
    40   1566.4 MiB      0.8 MiB           for p in jobs: p.start()
    41                             
    42   1574.9 MiB      1.0 MiB           for proc in jobs:
    43   1574.9 MiB     13.5 MiB               proc.join()
    44                             
    45   1563.5 MiB      0.0 MiB       print("Took {} seconds to complete ".format(starttime - time.time()))
    46   1563.5 MiB      0.0 MiB       return 1



Filename: MemoryNotFreed.py

Line #    Mem usage    Increment   Line Contents
================================================
    48    106.6 MiB    106.6 MiB   @profile
    49                             def doit():
    50                             
    51    106.6 MiB      0.0 MiB       print("create random images")
    52                             
    53    106.6 MiB      0.0 MiB       max_images = 700
    54                             
    55   1548.7 MiB   1442.1 MiB       images = np.random.rand(max_images, 300, 300,3)
    56                             
    57                                 # images = [x for x in range(0, 10000)]
    58   1549.0 MiB      0.3 MiB       manager = multiprocessing.Manager()
    59   1549.1 MiB      0.0 MiB       shared_list = manager.list()
    60                             
    61   1563.5 MiB     14.5 MiB       parallel_convert_all_to_hsv(images, shared_list)
    62                             
    63    121.6 MiB      0.0 MiB       del images
    64                             
    65    121.6 MiB      0.0 MiB       del shared_list
    66                             
    67    121.6 MiB      0.0 MiB       print()
0 голосов
/ 22 января 2020

Действительно, есть проблема утечки, но она не появляется для некоторых магических параметров. Я не мог этого понять, но мы можем уменьшить утечку, передав список в pool.map вместо ndarray. images_converted = pool.map(rgb2hsv, [i for i in imgs])

Это последовательно уменьшает утечку памяти в моих тестах.

СТАРЫЙ ОТВЕТ:

Кажется, в пуле нет проблем. Вы не должны ожидать, что «del pool» в строке 31 освободит вашу память, так как ее занимают переменные «imgs» и «images_converted». Они находятся в области действия функции «parallel_convert_all_to_hsv», а не в области «rgb2hsv», поэтому «del pool» к ним не относится.

Память исправлена, освобождена после удаления «images» и « images_converted "в строках 56 и 59.

...