Как приостановить процессы, если они потребляют слишком много памяти? - PullRequest
0 голосов
/ 27 ноября 2018

Справочная информация: Я обрабатываю планетарные изображения, используя набор утилит командной строки, предоставленных Геологической службой США.Некоторые из них до предела (до 10 ГБ).USGS говорит, что это просто способ, которым они работают, и у них нет никаких планов, чтобы попытаться лучше управлять RAM.Я построил оболочку Python для управления списками файлов, чтобы вызывать различные шаги для обработки данных по частям (например, все изображения, сделанные в одном цветном фильтре, все в другом, все в другом и т. Д.).Поскольку все сделано для нескольких списков и нескольких изображений, я использую все доступные мне процессоры, чтобы изменить то, что в противном случае может занять от двух месяцев до недели.В настоящее время я не использую нативные методы Python для потоков;вместо этого я использую GNU Parallel (и использую os.system ("") для вызова параллельной, а затем функции) или использую Pysis , который представляет собой способ Python для вызова и многопоточности программного обеспечения USGS.

Проблема: Как уже отмечалось, некоторые шаги для некоторых файлов занимают огромное количество оперативной памяти, и невозможно заранее знать, что это может быть.Таким образом, я могу попасть в ситуацию, когда для некоторых файлов каждый процесс использует 200 МБ и работает нормально на машине с 16 ГБ ОЗУ с 8 ядрами, но затем он может начать обрабатывать другие файлы, где я получаю ползучую память, используя несколько ГБ,который с 8 процессорами на машине с 16 ГБ ОЗУ означает, что ОЗУ сжато, пространство подкачки используется ... и вот если мне повезет, и машина не просто заблокируется.

Решение?То, что я ищу, - это способ контролировать использование ОЗУ, скажем, раз в минуту, по имени процесса, и если я начинаю видеть ползучесть ОЗУ (например, 8 экземпляров процесса, каждый из которых использует более 2 ГБ ОЗУ), я могуприостановите все, кроме одного, позвольте этому одному закончить, отмените паузу другому, позвольте этому закончить и т. д., пока эти 8 не будут сделаны, затем продолжите с остальным, что может потребоваться для выполнения этого шага.Надеюсь, очевидно, что все это будет сделано в Python, а не вручную.

Возможно ли это сделать?Если да, то как?

Ответы [ 2 ]

0 голосов
/ 30 ноября 2018

parallel --memfree построено для этой ситуации:

parallel --memfree 1G doit ::: {1..100}

Это вызовет новый процесс, только если доступно более 1 ГБ ОЗУ.Если освободится менее 0,5 * 1 ГБ, он убьет самого младшего и вернет это задание в очередь.

Считалось, что он только приостанавливает / приостанавливает самое молодое задание, но опыт показал, что свопинг этого процессавремя от времени будет намного медленнее, чем просто перезапуск работы.

0 голосов
/ 27 ноября 2018

Вы можете использовать psutil.Process.suspend(), чтобы приостановить выполнение запущенных процессов, которые превышают заданный порог памяти.Мониторинговая часть просто несколько раз сравнивает psutil.Process().memory_info().rss («Размер резидентного набора») запущенных процессов с заданным вами порогом.Как вы планируете дальнейшую обработку, зависит только от вас.

В приведенном ниже примере я приостанавливаю процессы-виновники до тех пор, пока не завершится все остальное, а затем возобновляю однажды приостановленные процессы по одному.Предполагается, что это упрощенный подход для демонстрации общего механизма.

import time
import random
from threading import Thread
from multiprocessing import Process, active_children

import psutil


def format_mib(mem_bytes):
    """Format bytes into mebibyte-string."""
    return f'{mem_bytes / 2 ** 20:.2f} MiB'


def f(append_length):
    """Main function in child-process. Appends random floats to list."""
    p = psutil.Process()
    li = []
    for i in range(10):
        li.extend([random.random() for _ in range(append_length)])
        print(f'i: {i} | pid: {p.pid} | '
              f'{format_mib(p.memory_full_info().rss)}')
        time.sleep(2)


def monitored(running_processes, max_mib):
    """Monitor memory usage for running processes.
    Suspend execution for processes surpassing `max_mib` and complete
    one by one after behaving processes have finished.
    """
    running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
    suspended_processes = []

    while running_processes:
        active_children()  # Joins all finished processes.
        #  Without it, p.is_running() below on Unix would not return `False`
        #  for finished processes.
        actual_processes = running_processes.copy()
        for p in actual_processes:
            if not p.is_running():
                running_processes.remove(p)
                print(f'removed finished process: {p}')
            else:
                if p.memory_info().rss / 2 ** 20 > max_mib:
                    print(f'suspending process: {p}')
                    p.suspend()
                    running_processes.remove(p)
                    suspended_processes.append(p)

        time.sleep(1)

    for p in suspended_processes:
        print(f'\nresuming process: {p}')
        p.resume()
        p.wait()


if __name__ == '__main__':

    MAX_MiB = 200

    append_lengths = [100000, 500000, 1000000, 2000000, 300000]
    processes = [Process(target=f, args=(append_length,))
                 for append_length in append_lengths]

    for p in processes:
        p.start()

    m = Thread(target=monitored, args=(processes, MAX_MiB))
    m.start()
    m.join()

Пример вывода (укороченный) с двумя процессами, приостановленными из-за превышения порогового значения 200 МБ и возобновленными после завершения действующих процессов:

i: 0 | pid: 17997 | 13.53 MiB
i: 0 | pid: 18001 | 19.70 MiB
i: 0 | pid: 17998 | 25.88 MiB
i: 0 | pid: 17999 | 41.32 MiB
i: 0 | pid: 18000 | 72.21 MiB
...
i: 2 | pid: 17997 | 20.84 MiB
i: 2 | pid: 18001 | 42.02 MiB
i: 2 | pid: 17998 | 60.56 MiB
i: 2 | pid: 17999 | 103.36 MiB
i: 2 | pid: 18000 | 215.70 MiB
suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 17997 | 23.93 MiB
i: 3 | pid: 18001 | 47.75 MiB
i: 3 | pid: 17998 | 76.00 MiB
i: 3 | pid: 17999 | 141.59 MiB
...
i: 5 | pid: 17997 | 30.11 MiB
i: 5 | pid: 18001 | 68.24 MiB
i: 5 | pid: 17998 | 107.23 MiB
i: 5 | pid: 17999 | 203.52 MiB
suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17997 | 33.19 MiB
i: 6 | pid: 18001 | 77.49 MiB
i: 6 | pid: 17998 | 122.59 MiB
...
i: 9 | pid: 17997 | 42.47 MiB
i: 9 | pid: 18001 | 105.68 MiB
i: 9 | pid: 17998 | 168.96 MiB
removed finished process: psutil.Process(pid=17997, status='terminated')
removed finished process: psutil.Process(pid=17998, status='terminated')
removed finished process: psutil.Process(pid=18001, status='terminated')

resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 18000 | 277.46 MiB
i: 4 | pid: 18000 | 339.22 MiB
i: 5 | pid: 18000 | 400.84 MiB
...
i: 9 | pid: 18000 | 648.00 MiB

resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17999 | 234.55 MiB
...
i: 9 | pid: 17999 | 327.31 MiB


Process finished with exit code 0

РЕДАКТИРОВАТЬ:

Я думаю, что мой единственный оставшийся вопрос от работы через это, как я могу заставить его порождать только определенное числопотоков [sic!] за раз, когда материал завершает добавление оставшихся, а затем делает все приостановленные в конце?

Я расширил код выше, чтобы позволить запускать новые процессы как старыезакончить с максимумом для запущенных процессов, установленным на количество ядер.Я также реорганизовал его в класс, так как в противном случае он начал бы запутываться со всем необходимым состоянием, которым нужно управлять.В приведенном ниже коде для краткости я использую названия «задачи» и «процессы» взаимозаменяемо.Обратите внимание на измененный метод запуска процессов и сопровождающий комментарий в коде.

import time
import random
from threading import Thread
from collections import deque
from multiprocessing import Process, active_children, set_start_method

import psutil

# `def format_mib` and `def f` from above unchanged...

class TaskProcessor(Thread):
    """Processor class which monitors memory usage for running
    tasks (processes). Suspends execution for tasks surpassing
    `max_mib` and completes them one by one, after behaving
    tasks have finished.
    """
    def __init__(self, n_cores, max_mib, tasks):
        super().__init__()
        self.n_cores = n_cores
        self.max_mib = max_mib  # memory threshold
        self.tasks = deque(tasks)

        self._running_tasks = []
        self._suspended_tasks = []

    def run(self):
        """Main-function in new thread."""
        self._update_running_tasks()
        self._monitor_running_tasks()
        self._process_suspended_tasks()

    def _update_running_tasks(self):
        """Start new tasks if we have less running tasks than cores."""
        while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
            p = self.tasks.popleft()
            p.start()
            # for further process-management we here just need the
            # psutil.Process wrapper
            self._running_tasks.append(psutil.Process(pid=p.pid))
            print(f'Started process: {self._running_tasks[-1]}')

    def _monitor_running_tasks(self):
        """Monitor running tasks. Replace completed tasks and suspend tasks
        which exceed the memory threshold `self.max_mib`.
        """
        # loop while we have running or non-started tasks
        while self._running_tasks or self.tasks:
            active_children()  # Joins all finished processes.
            # Without it, p.is_running() below on Unix would not return
            # `False` for finished processes.
            self._update_running_tasks()
            actual_tasks = self._running_tasks.copy()

            for p in actual_tasks:
                if not p.is_running():  # process has finished
                    self._running_tasks.remove(p)
                    print(f'Removed finished process: {p}')
                else:
                    if p.memory_info().rss / 2 ** 20 > self.max_mib:
                        p.suspend()
                        self._running_tasks.remove(p)
                        self._suspended_tasks.append(p)
                        print(f'Suspended process: {p}')

            time.sleep(1)

    def _process_suspended_tasks(self):
        """Resume processing of suspended tasks."""
        for p in self._suspended_tasks:
            print(f'\nResuming process: {p}')
            p.resume()
            p.wait()


if __name__ == '__main__':

    # Forking (default on Unix-y systems) an already multithreaded process is
    # error-prone. Since we intend to start processes after we are already
    # multithreaded, we switch to another start-method.
    set_start_method('spawn')  # or 'forkserver' (a bit faster start up) if available

    MAX_MiB = 200
    N_CORES = 2

    append_lengths = [100000, 500000, 1000000, 2000000, 300000]
    tasks = [Process(target=f, args=(append_length,))
             for append_length in append_lengths]

    tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
    tp.start()
    tp.join()

Пример вывода (сокращено):

Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
i: 0 | pid: 9422 | 18.95 MiB
i: 0 | pid: 9423 | 31.45 MiB
...
i: 9 | pid: 9422 | 47.36 MiB
i: 9 | pid: 9423 | 175.41 MiB
Removed finished process: psutil.Process(pid=9422, status='terminated')
Removed finished process: psutil.Process(pid=9423, status='terminated')
Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 0 | pid: 9445 | 46.86 MiB
i: 0 | pid: 9446 | 77.74 MiB
...
i: 2 | pid: 9445 | 117.41 MiB
i: 2 | pid: 9446 | 220.99 MiB
Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
i: 0 | pid: 9450 | 25.16 MiB
i: 3 | pid: 9445 | 148.29 MiB
i: 1 | pid: 9450 | 36.47 MiB
i: 4 | pid: 9445 | 179.17 MiB
i: 2 | pid: 9450 | 45.74 MiB
i: 5 | pid: 9445 | 211.14 MiB
Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 3 | pid: 9450 | 55.00 MiB
...
i: 9 | pid: 9450 | 110.62 MiB
Removed finished process: psutil.Process(pid=9450, status='terminated')

Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 3 | pid: 9446 | 282.75 MiB
...
i: 9 | pid: 9446 | 655.35 MiB

Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 6 | pid: 9445 | 242.12 MiB
...
i: 9 | pid: 9445 | 334.88 MiB

Process finished with exit code 0
...