from multiprocessing.dummy import Pool as ThreadPool
class TSNew:
def __init__(self):
self.redis_client = redis.StrictRedis(host="172.17.31.147", port=4401, db=0)
self.pool = ThreadPool(40) # init pool
self.dnn_model = None
self.lock=threading.Lock()
self.t1 = threading.Thread(target=self.load_model_item)
self.t1.start()
self.t2 = threading.Thread(target=self.process_user_dict)
self.t2.start()
def load_model_item(self):
while True:
self.lock.acquire()
#here load model from disk as self.dnn_model
self.lock.release()
time.sleep(600)
def predict_memcache(self):
'''
process every element in user_dicts
'''
def process_user_dict(self,user_dict):
while True:
# construct user_dicts as a list
# use self.dnn_model to predict by self.pool
results = self.pool.map(self.predict_memcache, user_dicts)
TSNew_ = TSNew()
def get_user_result():
logging.info("----------------come in ------------------")
if request.method == 'POST':
user_dict_json = request.get_data()# userid
'''
....
'''
return 'SUCCESS\n'
@app.route('/', methods=['POST'])
def get_ts_gbdt_id():
return get_user_result()
from werkzeug.contrib.fixers import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=4444)
В классе TSNew
и process_user_dict
есть два потока, использующих ThreadPool()
для обработки user_dicts
параллельно через predict_memcache
функцию.Функция должна использовать модель keras, которая загружается с диска каждые несколько минут.Как я уже писал, во время периода загрузки модели (может занять несколько секунд) я не хочу, чтобы модель прогнозировалась в predict_memcache
, поэтому я пытаюсь использовать self.lock.acquire()
.Но, похоже, не работает и model.predict выдаст ошибку при загрузке.Итак, как правильно загрузить модель при загрузке?