Исполнитель асинхронного пула процессов не работает в торнадо - PullRequest
0 голосов
/ 30 сентября 2019

Я чувствую, что на данный момент я далек от хорошей практики. Я использую каркас торнадо, в то же время используя ProcessPoolExecutor

import tornado.ioloop
import datetime
import tornado.web
import azure.functions as func
import json
import logging
import requests
import sys
import os
import pymongo
import mongo_config
import re
import concurrent.futures
from azure_model import Pytorch_Azure

MONGO_URL = mongo_config.uri()
mongo_client = pymongo.MongoClient(MONGO_URL)
db = mongo_client['db']
prods = mongo_client['db']['products']

pta = Pytorch_Azure()

def parallel_pred(img):
    r = requests.get(img, timeout = 10)
    img_id = img.split('/')[-1].split('.')[0]
    img_name = 'tmp{}.png'.format(img_id)
    with open(img_name, 'wb') as f:
        f.write(r.content)
    prediction = pta.predict(img_name)
    os.remove(img_name)
    return prediction

class Predictionator(tornado.web.RequestHandler):
    def data_received(self, chunk):
        pass

    def get(self):
        merchant_id = self.get_argument('id', None, True)
        prod_type = self.request.uri.split('category=')[1].split('&id=')[0].replace('%20', ' ').replace('%26', '&').replace('%27', '\'')
        pred_list = []
        outputs = {}
        print(type(prod_type))
        if merchant_id and prod_type:

            counter = 0
            try:
                print(prod_type)
                for i in prods.find({'merchant': int(merchant_id), 'details.product_type':re.compile('^' + prod_type + '$', re.IGNORECASE)}):


                    prod_img = i['merchantImages'][0]
                    if prod_img not in pred_list:
                        pred_list.append(prod_img)
                        counter += 1
                        if counter == 5:
                            break
            except:
                self.write({'body': 'There was an error with the query. Please ensure you are using a correct merchant id and product type'})
            print(pred_list)

            if pred_list:

                try:   

                    executor = concurrent.futures.ProcessPoolExecutor(4)
                    for pred_out in executor.map(parallel_pred, pred_list, timeout = 15):
                            if pred_out['label'] not in outputs.keys():
                                outputs[pred_out['label']] = 1
                            else:
                                outputs[pred_out['label']] += 1


                except:
                    self.write({'body': 'There was an issue making the predictions.'})


                if outputs:
                    prediction = {}
                    prediction['label'] = max(outputs, key = outputs.get)
                    prediction['object_id'] = db.categories.find_one({'name':prediction['label']})['_id']

                    print(outputs)
                    self.write(json.dumps(prediction))
                else:
                    self.write({'statusCode': 400, 'body':'An error occurred.'})
            else:
                self.write({'statusCode': 400, 'body':'There were no results returned. Please ensure the id parameter has a valid merchant id and the category id has a valid product type'})
        else:
            self.write({'statusCode': 400, 'body':'Please pass a name on the query string or in the request body'})




def make_app():
    return tornado.web.Application([
        (r'/categorize',Predictionator),
    ])

def start_nado():
    print('starting nado')
    app = make_app()
    server = app.listen(8888)
    return server

def restart():
    python = sys.executable
    os.execl(python, python, * sys.argv)

def stop_nado():
    ioloop = tornado.ioloop.IOLoop.instance()
    ioloop.add_callback(ioloop.stop)
    ioloop.add_callback(ioloop.close)
    print('stopping nado')

def main():
    while True:
        try:
            try:
                server = start_nado()
                tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado)
                tornado.ioloop.IOLoop.current().start()
            except OSError:
                print('restarting')
                restart()
        except KeyboardInterrupt:
            tornado.ioloop.IOLoop.instance().stop()
            break


if __name__ == "__main__":
    try:
        main()
    except OSError:
        tornado.ioloop.IOLoop.instance.stop()
        main()

Основная проблема связана с классом Predictionator. Идея состоит в том, что он берет 5 продуктов из базы данных и делает прогноз для каждого и возвращает, какой класс имел больше всего прогнозов. Это прекрасно работает, но это заняло некоторое время, поэтому мы хотели распараллелить его с процессами. Первая проблема возникла из-за зависания, когда она делала прогнозы на двоих, а затем становилась совершенно безразличной. Это когда tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado) стал решением, по существу, перезапускать сервер торнадо каждые 10 минут. После этого произошла ошибка OSError: [Errno 24] Too many open files. Именно тогда функция restart стала [хакерским] решением, которое в значительной степени просто перезапустило бы программу. Весь этот процесс работал довольно хорошо в течение примерно 2 дней, и после этого сервер, на котором он работал, полностью перестал отвечать на запросы. На данный момент, я просто ищу точку в правильном направлении, я сомневаюсь, что проблема в торнадо, но должен ли я вообще использовать другую структуру? Я довольно новичок в торнадо и параллельных процессах с Python. Спасибо

1 Ответ

0 голосов
/ 30 сентября 2019

Поскольку вы создаете 4 новых процесса каждый раз, когда if pred_list условие выполняется.

Как правило, в программах Tornado вы создаете глобальный объект executor и повторно используете его.

# create a global object
executor = concurrent.futures.ProcessPoolExecutor(4)

class Predictionator(...):
    ...
    def get():
        ...
        # use the global `executor` object instead of creating a new one
        for pred_out in executor.map(...)

Другой подход заключается в создании executor в операторе with...as, и эти процессы будут автоматически закрыты и очищены после выполнения их задач.

def get():
    ...
    with concurrent.futures.ProcessPoolExecutor(4) as executor:
        for pred_out in executor.map(...)

Первый подход даст вам лучшую производительность. Во втором подходе есть издержки, связанные с созданием и закрытием процессов.

...