Я чувствую, что на данный момент я далек от хорошей практики. Я использую каркас торнадо, в то же время используя ProcessPoolExecutor
import tornado.ioloop
import datetime
import tornado.web
import azure.functions as func
import json
import logging
import requests
import sys
import os
import pymongo
import mongo_config
import re
import concurrent.futures
from azure_model import Pytorch_Azure
MONGO_URL = mongo_config.uri()
mongo_client = pymongo.MongoClient(MONGO_URL)
db = mongo_client['db']
prods = mongo_client['db']['products']
pta = Pytorch_Azure()
def parallel_pred(img):
r = requests.get(img, timeout = 10)
img_id = img.split('/')[-1].split('.')[0]
img_name = 'tmp{}.png'.format(img_id)
with open(img_name, 'wb') as f:
f.write(r.content)
prediction = pta.predict(img_name)
os.remove(img_name)
return prediction
class Predictionator(tornado.web.RequestHandler):
def data_received(self, chunk):
pass
def get(self):
merchant_id = self.get_argument('id', None, True)
prod_type = self.request.uri.split('category=')[1].split('&id=')[0].replace('%20', ' ').replace('%26', '&').replace('%27', '\'')
pred_list = []
outputs = {}
print(type(prod_type))
if merchant_id and prod_type:
counter = 0
try:
print(prod_type)
for i in prods.find({'merchant': int(merchant_id), 'details.product_type':re.compile('^' + prod_type + '$', re.IGNORECASE)}):
prod_img = i['merchantImages'][0]
if prod_img not in pred_list:
pred_list.append(prod_img)
counter += 1
if counter == 5:
break
except:
self.write({'body': 'There was an error with the query. Please ensure you are using a correct merchant id and product type'})
print(pred_list)
if pred_list:
try:
executor = concurrent.futures.ProcessPoolExecutor(4)
for pred_out in executor.map(parallel_pred, pred_list, timeout = 15):
if pred_out['label'] not in outputs.keys():
outputs[pred_out['label']] = 1
else:
outputs[pred_out['label']] += 1
except:
self.write({'body': 'There was an issue making the predictions.'})
if outputs:
prediction = {}
prediction['label'] = max(outputs, key = outputs.get)
prediction['object_id'] = db.categories.find_one({'name':prediction['label']})['_id']
print(outputs)
self.write(json.dumps(prediction))
else:
self.write({'statusCode': 400, 'body':'An error occurred.'})
else:
self.write({'statusCode': 400, 'body':'There were no results returned. Please ensure the id parameter has a valid merchant id and the category id has a valid product type'})
else:
self.write({'statusCode': 400, 'body':'Please pass a name on the query string or in the request body'})
def make_app():
return tornado.web.Application([
(r'/categorize',Predictionator),
])
def start_nado():
print('starting nado')
app = make_app()
server = app.listen(8888)
return server
def restart():
python = sys.executable
os.execl(python, python, * sys.argv)
def stop_nado():
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
ioloop.add_callback(ioloop.close)
print('stopping nado')
def main():
while True:
try:
try:
server = start_nado()
tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado)
tornado.ioloop.IOLoop.current().start()
except OSError:
print('restarting')
restart()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.instance().stop()
break
if __name__ == "__main__":
try:
main()
except OSError:
tornado.ioloop.IOLoop.instance.stop()
main()
Основная проблема связана с классом Predictionator. Идея состоит в том, что он берет 5 продуктов из базы данных и делает прогноз для каждого и возвращает, какой класс имел больше всего прогнозов. Это прекрасно работает, но это заняло некоторое время, поэтому мы хотели распараллелить его с процессами. Первая проблема возникла из-за зависания, когда она делала прогнозы на двоих, а затем становилась совершенно безразличной. Это когда tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado)
стал решением, по существу, перезапускать сервер торнадо каждые 10 минут. После этого произошла ошибка OSError: [Errno 24] Too many open files
. Именно тогда функция restart
стала [хакерским] решением, которое в значительной степени просто перезапустило бы программу. Весь этот процесс работал довольно хорошо в течение примерно 2 дней, и после этого сервер, на котором он работал, полностью перестал отвечать на запросы. На данный момент, я просто ищу точку в правильном направлении, я сомневаюсь, что проблема в торнадо, но должен ли я вообще использовать другую структуру? Я довольно новичок в торнадо и параллельных процессах с Python. Спасибо