python декремент пространства, используемого в памяти, когда предмет выбирается из очереди - PullRequest
0 голосов
/ 10 марта 2020

в тот момент, когда моя программа запускает потоки производителя и потребительский поток, все потоки собирают и вставляют элементы в объект общей очереди и сохраняют ссылку на него.

Я знаю, что сборщик мусора очищает памяти, когда у вас больше нет ссылок на объект, но моя проблема в другом, так как я должен сохранить ссылки на очередь, чтобы продолжить вставку или удаление элементов

Запуск программы, я заметил, что когда я вставляю элемент в очередь, пространство памяти, выделенное для очереди, увеличивается вместе с длиной очереди, в то время как когда я беру элемент из очереди, его размер уменьшается, а пространство памяти, назначенное очереди, не уменьшается, есть ли способ уменьшить пространство памяти тоже? С этой проблемой выделенная память продолжает расти для каждого вставленного элемента, пока он не займет всю доступную память.

ОБНОВЛЕНИЕ После некоторого исследования, которое я обнаружил, кажется, что проблема очень похожа на мою https://python-forum.io/Thread-Queue-Queue-would-not-reduce-capacity-after-get также предлагают создать новую очередь и обменяться, как я могу сделать это эффективно?

пример моего кода

Основной поток создает process_queue и запускает Threads

#this is the main thread
import queue
process_queue = ProcessQueue()
#main thread starts StartProcessThread and SendingThread, passes process_queue as param

ProcessQueue

class ProcessQueue():

    def __init__(self):
        self.q = queue.Queue()

    def put(self,elem):
        self.q.put(elem)

    def get(self):
        elem = self.q.get()
        temp = queue.Queue()

        #swap queue

        return elem

    def task_done(self):
        self.q.task_done()

StartProcessThread

class StartProcessThread(Thread):
    def __init__(self, process_queue):
        super().__init__()
        self.process_queue = process_queue
        #some vars

    #some methods


    def start_download(self, name):

        try:
            #starts a process
            process = Popen(
                ["path", "params"])
        except Exception as e:
            return False

        process_list = [process, name]

        self.process_queue.put(process_list)

        return True


    def task(self, url, name):
        #something

        if self.start_download(name):
            #something
            return True
        else:

            return False

    def run(self):
        # threadpoolexecutor
        executor = ThreadPoolExecutor(max_workers=3)
        while True:
            strm = #get something from db

            if str: 
                executor.submit(self.task, (url))
            else:
                sleep(10)

SendingThread

class SendingThread(Thread):
    def __init__(self,process_queue):
        super(SendingThread, self).__init__()
        self.process_queue = process_queue

    def get_process(self):
        process = self.process_queue.get()
        poll = process[0].poll()
        # if poll == none process still alive
        if poll is not None:
            return process

        self.process_queue.put(process)
        return []

    def run(self):
        while True:
            process = self.get_process()
            if process:
                # do something
...