в тот момент, когда моя программа запускает потоки производителя и потребительский поток, все потоки собирают и вставляют элементы в объект общей очереди и сохраняют ссылку на него.
Я знаю, что сборщик мусора очищает памяти, когда у вас больше нет ссылок на объект, но моя проблема в другом, так как я должен сохранить ссылки на очередь, чтобы продолжить вставку или удаление элементов
Запуск программы, я заметил, что когда я вставляю элемент в очередь, пространство памяти, выделенное для очереди, увеличивается вместе с длиной очереди, в то время как когда я беру элемент из очереди, его размер уменьшается, а пространство памяти, назначенное очереди, не уменьшается, есть ли способ уменьшить пространство памяти тоже? С этой проблемой выделенная память продолжает расти для каждого вставленного элемента, пока он не займет всю доступную память.
ОБНОВЛЕНИЕ После некоторого исследования, которое я обнаружил, кажется, что проблема очень похожа на мою https://python-forum.io/Thread-Queue-Queue-would-not-reduce-capacity-after-get также предлагают создать новую очередь и обменяться, как я могу сделать это эффективно?
пример моего кода
Основной поток создает process_queue и запускает Threads
#this is the main thread
import queue
process_queue = ProcessQueue()
#main thread starts StartProcessThread and SendingThread, passes process_queue as param
ProcessQueue
class ProcessQueue():
def __init__(self):
self.q = queue.Queue()
def put(self,elem):
self.q.put(elem)
def get(self):
elem = self.q.get()
temp = queue.Queue()
#swap queue
return elem
def task_done(self):
self.q.task_done()
StartProcessThread
class StartProcessThread(Thread):
def __init__(self, process_queue):
super().__init__()
self.process_queue = process_queue
#some vars
#some methods
def start_download(self, name):
try:
#starts a process
process = Popen(
["path", "params"])
except Exception as e:
return False
process_list = [process, name]
self.process_queue.put(process_list)
return True
def task(self, url, name):
#something
if self.start_download(name):
#something
return True
else:
return False
def run(self):
# threadpoolexecutor
executor = ThreadPoolExecutor(max_workers=3)
while True:
strm = #get something from db
if str:
executor.submit(self.task, (url))
else:
sleep(10)
SendingThread
class SendingThread(Thread):
def __init__(self,process_queue):
super(SendingThread, self).__init__()
self.process_queue = process_queue
def get_process(self):
process = self.process_queue.get()
poll = process[0].poll()
# if poll == none process still alive
if poll is not None:
return process
self.process_queue.put(process)
return []
def run(self):
while True:
process = self.get_process()
if process:
# do something