multiprocess + gevent + redis становятся медленнее - PullRequest
0 голосов
/ 12 марта 2019

Версия : redis == 3.2.0 gevent == 1.4.0

Платформа : py2.7, CentOS Linux выпуск 7.4.1708

Описание :

больше журнала о kline.zip

Это синхронный журнал времени окончания кода,очень быстро получать данные.

kline-log    Tue, 12 Mar 2019 01:41:03 ERROR    Total time running [coinbell_bc_btc_15] redis update_aug: 0.0138018131256 seconds
kline-log    Tue, 12 Mar 2019 01:41:03 ERROR    Total time running [coinbell_bc_btc_15] total kline aug: 0.0759608745575 seconds

Это журнал учета времени завершения кода асинхронный , получение данных даже стоит более 1 секунды .Иногда это будет стоить 4 или 5 секунд

kline-log    Tue, 12 Mar 2019 01:51:03 ERROR    Total time running [coinbell_bc_btc_15] redis update_aug: 1.5217051506 seconds
kline-log    Tue, 12 Mar 2019 01:51:04 ERROR    Total time running [coinbell_bc_btc_15] total kline aug: 1.61521601677 seconds

Мой конфиг redis kwargs - это информация о подключении порт хоста и пароль .При использовании redis я получу тот же экземпляр redis по ключу от redis_pools

redis_max_connections = 4

def __redis_conn( **kwargs):
    return redis.StrictRedis(connection_pool=redis.ConnectionPool(
        max_connections=config.redis_max_connections,
        # socket_timeout=config.redis_socket_timeout,
        socket_keepalive=True,
        **kwargs
    ))

def _init_redis(platform_name, key, **kwargs):
    redis_pools['{}_{}'.format(platform_name, key)] = __redis_conn(**kwargs)


redis_pools = {}

for platform_name, setting in config.redis_config.iteritems():
    for key, c in setting.iteritems():
        _init_redis(platform_name.lower(), key.lower(), **c)


def _get_redis(platform_name, key):
    return redis_pools['{}_{}'.format(platform_name.lower(), key.lower())]


save_redis = partial(_get_redis, key="save")

Мой синхронный код

def kline(mod_name='coinbell', run_key='kline_aug:1w', table_name=None):
    while True:
        with t_with_time('finish kline aug', logger.get('kline-log').error):
            mod = util.import_mod("fibo_spider.kline_spiders.%s" % mod_name)
            spider = mod.spiders
            robot = spider(run_key)

            for s in robot:
                flag = True if table_name is None else s.table_name == table_name
                if flag:
                    print s
                    with t_with_time('{} finish kline aug'.format(s.table_name), logger.get('kline-log').error):
                        s.run()

Мой асинхронный код Я использую gevent.pool в многопроцессорном режиме для ускорения

@func_time(logger.get("kline-log").error, "pool")
def pool_func(func, func_list):
    pool = Pool(50)
    try:
        for func_part in func_list:
            pool.add(gevent.spawn(func, func_part))
        pool.join()
    except Exception as e:
        raise


@func_time(logger.get("kline-log").error, "process")
def multi_process(func_part_list, process_num, run_func):
    if func_part_list:
        random.shuffle(func_part_list)
        process = []
        for func_part in util.split_list(func_part_list, process_num):
            # 加快统计效率
            p = Process(target=pool_func, args=(run_func, func_part))
            process.append(p)
            p.start()
        for pro in process:
            pro.join()
        del process[:]


def run(tar):
    tar.run()


def run_kline(program_name, func, key, module_list, process_num=8, run_func=run, wait=5):
    while True:
        try:
            with t_with_time('update-yaml', logger.get('kline-log').error):
                ParseYaml.update_coin_config(program_name)
            with t_with_time('update-func', logger.get('kline-log').error):
                func_par_list = func(key, module_list)
            if func_par_list:
                multi_process(func_par_list, process_num, run_func)
                logger.get("kline-log").info("{} multi_process finish".format(key))
                time.sleep(wait)
            else:
                time.sleep(4)
        except Exception as e:
            logger.get("kline-log").error(util.error_msg())

def get_kline_spider(key, module_list):
    k = []
    for name in module_list:
        with util.reload_mod(IMPORT_PATH_FORMAT % name) as mod:
            k_spider = getattr(mod, 'spiders', None)
            if k_spider:
                k.extend(k_spider(key))
    return k


@cli.command()
@click.argument('program_name')
@click.argument('key')
def aug(program_name, key):
    """更新K线到redis(全量增量)"""
    module_list = [program_name, ]
    spider_key = "kline_aug:{}".format(key)
    run_kline(program_name, get_kline_spider, spider_key, module_list, process_num=2, wait=1)

Важный код просто запустите функцию run

class KlineAug(BaseRedis):
    """更新K线到redis"""
    max_k_period = config.max_k_period

    # db = dao.kline()

    def __init__(self, platform_name, table_name, key, field=None, parse_data_func=None, run_key=None):
        super(KlineAug, self).__init__(platform_name, table_name, key, field, run_key=run_key)
        self.parse_data_func = parse_data_func
        self.key = util.redis_key(const.REDIS_KEY_PREFIX.KLINE, self.table_name[:self.table_name.rfind('_')])
        self.redis = dao.save_redis(platform_name)
        self.db = dao.kline(platform_name)
        self.info_db = dao.info(platform_name)

        self.stash = Stash('kline_{}_aug'.format(table_name))
        self.ticker_last_tid_key = 'trade_last_tid'
        self.update_kline_dt_flag = 'kline_update_dt'

    def _update_aug(self):
        key = self.src_key.replace('kline', 'trades')
        with t_with_time('[%s] redis update_aug' % (self.table_name,), logger.get('kline-log').error):
            trade = util.json_loads(util.decompress(self.src_redis.get(key)))
        if trade:
            last_tid = self.stash.get(self.ticker_last_tid_key)
            new_trade = trade[0]
            logger.get('kline-log').error('[{}] {} {}'.format(self.table_name, last_tid, new_trade))
            return last_tid != new_trade['tid'], new_trade
        return False, None

    def run(self, params=None):
        # data = self.src_redis.hget(self.src_key, self.src_field)
        # rows = util.json_loads(util.decompress(data))
        with t_with_time('[%s] total kline aug' % (self.table_name,), logger.get('kline-log').error):
            update_dt = self.stash.get(self.update_kline_dt_flag)
            if update_dt and not util.update_limit_dt(update_dt, config.kline_wait_sec):
                logger.get('kline-log').error(
                    '[{}] {} {}'.format(self.table_name, update_dt, config.kline_wait_sec))
                return
            is_updated, new_trade = self._update_aug()
            try:
                if is_updated:
                    with t_with_time('[%s] kline aug get db' % (self.table_name,), logger.get('kline-log').error):
                        rows = QS(self.db).table(getattr(T, self.table_name)).order_by(
                            F.timestamp, desc=True).limit(0, self.max_k_period).select('*')
                    if rows:
                        # rows = self.parse_data(data[-self.max_k_period:])
                        df = pd.DataFrame(rows)
                        df.sort_values('timestamp', inplace=True)
                        # df = df.iloc[-self.max_k_period:]
                        df['timestamp'] = df['timestamp'].apply(lambda x: x * 1000)
                        self.insert_data(df)
                        logger.get('kline-log').error(
                            '[{}] {} {}'.format(self.table_name, rows[-1]['close'], new_trade['price']))
                        if new_trade and rows[-1]['close'] == util.safe_decimal(new_trade['price']):
                            self.stash[self.ticker_last_tid_key] = new_trade['tid']
                            self.stash[self.update_kline_dt_flag] = util.nowdt()
                    else:
                        self.none_data()

                self.insert_info()
            except Exception:
                logger.get('kline-log').error(util.error_msg())

    def insert_info(self):
        if self.run_key:
            db = self.info_db
            # data_type, _ = self.src_key.split(':')
            platform, symbola, symbolb, _ = self.table_name.split('_')
            with transaction(db) as trans:
                QS(db).table(T.collect_info).insert({
                    'type': self.run_key.lower(),
                    'symbol': '{}_{}'.format(symbola, symbolb).lower(),
                    'time': util.now(),
                    'platform': platform.lower(),
                }, on_duplicate_key_update={
                    'time': util.now(),
                })
                trans.finish()

    def insert_data(self, df):
        last_key = str(self.src_field) + ":last"
        minute = self.src_field
        minute_data = util.compress(self._df2json(df))
        last_key_data = self._df2json(df.iloc[-1])
        with t_with_time('[%s] kline aug send redis' % (self.table_name,),
                         logger.get('kline-log').error):
            self.redis.hmset(self.key, {
                minute: minute_data,
                last_key: last_key_data,
            })

    def _df2json(cls, df):
        rows = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
        return util.json_dumps(rows, default=str)

    def parse_data(self, data):
        return self.parse_data_func(data)

    def none_data(self):
        self.redis.delete(self.key)
...