У меня есть dag в Airflow, dag, обрабатывающий документы с информацией для изображений, и я запускаю этот dag для 500K документов, каждый документ содержит URL с изображением, загруженное изображение из этого URL (используйте для этого multiprocessing.ThreadPool), открываемый с помощью PIL. Изображение и прогнозируемая нейронная сеть категории для этого изображения. Я кодировал обработку изображений в пакетном режиме (5K изображений в каждом). Как освободить память, когда начнут обрабатываться следующие партии? В профилировщике памяти при вызове функции генератора увеличивается объем памяти (для img_batch, url_to_id_map в self.images_generator (img_index_docs_list) этой строки)
class ImagePredictingOp(BaseOperator):
@apply_defaults
def __init__(self,
mng_db,
mng_models_collection,
mng_image_index_collection,
*args, **kwargs):
super(ImagePredictingOp, self).__init__(*args, **kwargs)
self.mng_db = mng_db
self.mng_models_collection = mng_models_collection
self.mng_image_index_collection = mng_image_index_collection
self.is_scheduled = True
def images_generator(self, docs):
chunk_size = 5000
gc.collect()
del gc.garbage[:]
for batch in range(0, len(docs), chunk_size):
img_index_docs = docs[batch: batch + chunk_size]
url_to_id_map = {doc['Url']: doc['_id'] for doc in img_index_docs}
raw_img_list = ThreadPool(32).imap_unordered(self.get_raw_image, img_index_docs)
img_indx_raw_for_predict = {url: raw_img for raw_img, url in raw_img_list}
img_indx_for_predict = {url: img for url, img in img_indx_raw_for_predict.items() if img is not None}
yield img_indx_for_predict, url_to_id_map
def execute(self, context):
models_collection = self.init_mng_collection(self.mng_models_collection)
img_index_collection = self.init_mng_collection(self.mng_image_index_collection)
model_id_list = self.get_mng_doc_models(models_collection)
models_objects = self.get_trained_models_objects(model_id_list, models_collection)
img_index_docs_list = self.get_mng_images_doc(img_index_collection)
for img_batch, url_to_id_map in self.images_generator(img_index_docs_list):
gc.collect()
del gc.garbage[:]
for prediction_model in models_objects:
prediction_result = prediction_model.work(img_batch)
self.put_to_mongo_result(prediction_result, prediction_model.name, url_to_id_map, img_index_collection)
Это результат memory_profiler
Line # Mem usage Increment Line Contents
================================================
54 319.652 MiB 319.652 MiB @profile
55 def execute(self):
56 319.652 MiB 0.000 MiB count = 0
57 319.957 MiB 0.305 MiB models_collection = self.init_mng_collection(self.mng_models_collection)
58 319.957 MiB 0.000 MiB img_index_collection = self.init_mng_collection(self.mng_image_index_collection)
59 319.957 MiB 0.000 MiB model_id_list = ['5d9de619929ea61c40ae6267']
60 2143.152 MiB 1823.195 MiB models_objects = self.get_trained_models_objects(model_id_list, models_collection)
61 5196.199 MiB 3053.047 MiB img_index_docs_list = self.get_mng_images_doc(img_index_collection)
62 20555.500 MiB 7782.336 MiB for img_batch, url_to_id_map in self.images_generator(img_index_docs_list):
63 20555.500 MiB 0.000 MiB count += 1
64 20605.824 MiB 0.000 MiB for prediction_model in models_objects:
65 20606.246 MiB 90.785 MiB prediction_result = prediction_model.work(img_batch)
66 20605.824 MiB 0.000 MiB self.put_to_img_index(prediction_result, prediction_model.name, url_to_id_map, img_index_collection)
67 20605.824 MiB 0.000 MiB if count == 2:
68 20605.824 MiB 0.000 MiB break