Правильно ли инициировать многопроцессорность в классе __init__? - PullRequest
0 голосов
/ 01 октября 2018
from multiprocessing.dummy import Pool as ThreadPool
class TSNew:
    def __init__(self):
        self.redis_client = redis.StrictRedis(host="172.17.31.147", port=4401, db=0)
        self.global_switch = 0
        self.pool = ThreadPool(40) # init pool
        self.dnn_model = None
        self.nnf = None
        self.md5sum_nnf = "initialize"
        self.thread = threading.Thread(target=self.load_model_item)
        self.ts_picked_ids = None
        self.thread.start()

        self.memory = deque(maxlen=3000)
        self.process = threading.Thread(target=self.process_user_dict)
        self.process.start()

    def load_model_item(self):
        '''
        code
        '''
    def predict_memcache(self,user_dict):
        '''
        code
        '''
    def process_user_dict(self):
        while True:
            '''
            code to generate user_dicts which is a list 
            '''
            results = self.pool.map(self.predict_memcache, user_dicts)
            '''
            code
            '''
TSNew_ = TSNew()

def get_user_result():
    logging.info("----------------come in ------------------")
    if request.method == 'POST':
        user_dict_json = request.get_data()# userid
        if user_dict_json == '' or user_dict_json is None:
            logging.info("----------------user_dict_json is ''------------------")
            return ''
        try:
            user_dict = json.loads(user_dict_json)
        except:
            logging.info("json load error, pass")
            return ''
        TSNew_.memory.append(user_dict)
        logging.info('add to deque TSNew_.memory size: %d  PID: %d', len(TSNew_.memory), os.getpid())
        logging.info("add to deque userid: %s, nation: %s \n",user_dict['user_id'],  user_dict['user_country'])
        return 'SUCCESS\n'


@app.route('/', methods=['POST'])
def get_ts_gbdt_id():
    return get_user_result()

from werkzeug.contrib.fixers import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=4444)

Я создаю пул из нескольких потоков в классе __init__ и использую self.pool для сопоставления функции predict_memcache.У меня есть два сомнения: (a) Должен ли я инициализировать пул в __init__ или просто инициализировать его прямо перед

results = self.pool.map(self.predict_memcache, user_dicts)

(b) Поскольку пул является многопоточной операцией, и он выполняется в потокеprocess_user_dict, есть ли скрытая ошибка?Благодаря.

1 Ответ

0 голосов
/ 01 октября 2018

Вопрос (а):

Это зависит.Если вам нужно запустить process_user_dict более одного раза, то имеет смысл запустить пул в конструкторе и продолжить его работу.Создание пула потоков всегда сопряжено с некоторыми издержками, и, поддерживая пул между вызовами на process_user_dict, вы избежите этих дополнительных издержек.

Если вы просто хотите обработать один набор входных данных, вы также можете создатьваш бассейн прямо внутри process_user_dict.Но, вероятно, не прямо перед results = self.pool.map(self.predict_memcache, user_dicts), потому что это создаст пул для каждой итерации окружающего цикла while.

В вашем конкретном случае это не имеет никакого значения.Вы создаете свой TSNew_ объект на уровне модуля, чтобы он оставался живым (и вместе с ним пул потоков), пока ваше приложение работает;один и тот же пул потоков из того же экземпляра TSNew используется для обработки всех запросов в течение времени жизни app.run().Поскольку вы, кажется, используете эту конструкцию с self.process = threading.Thread(target=self.process_user_dict) в качестве своего рода слушателя на self.memory, создание пула в конструкторе функционально эквивалентно созданию пула внутри process_user_dict (но вне цикла).

Вопрос (b):

Технически, по умолчанию не существует скрытой ошибки при создании потока внутри потока.В конце концов, конечным родителем любого дополнительного потока всегда является MainThread, который неявно создается для каждого экземпляра интерпретатора Python.По сути, каждый раз, когда вы создаете поток внутри программы Python, вы создаете поток в потоке.

На самом деле, ваш код даже не создает поток внутри потока.Ваш self.pool создан внутри MainThread.Когда пул создается с помощью self.pool = ThreadPool(40), он создает желаемое количество (40) рабочих потоков, плюс один поток рабочего обработчика, один поток обработчика задач и один поток обработчика результатов.Все это дочерние потоки MainThread.Все, что вы делаете в отношении своего пула в потоке под self.process, вызывает его метод map для назначения ему задач.

Однако я не очень понимаю, что вы делаете с этимself.process здесь.Делая предположение, я бы сказал, что вы хотите запустить цикл в process_user_dict, чтобы он действовал как * слушатель на self.memory, так что пул начинает обрабатывать user_dict, как только они начинают появляться в deque в self.memory.Из того, что я вижу, что вы делаете в get_user_result, вы, похоже, получаете один user_dict за запрос.Я понимаю, что у вас могут быть параллельные пользовательские сессии, проходящие в этих диктовках, но действительно ли вы видите, что benfit из process_user_dict работает в бесконечном цикле по сравнению с простым вызовом TSNew_.process_user_dict() после TSNew_.memory.append(user_dict)?Вы можете даже полностью пропустить self.memory и передать диктант непосредственно process_user_dict, если я не пропущу то, что вы нам не показали.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...