Версия : redis == 3.2.0 gevent == 1.4.0
Платформа : py2.7, CentOS Linux выпуск 7.4.1708
Описание :
больше журнала о kline.zip
Это синхронный журнал времени окончания кода,очень быстро получать данные.
kline-log Tue, 12 Mar 2019 01:41:03 ERROR Total time running [coinbell_bc_btc_15] redis update_aug: 0.0138018131256 seconds
kline-log Tue, 12 Mar 2019 01:41:03 ERROR Total time running [coinbell_bc_btc_15] total kline aug: 0.0759608745575 seconds
Это журнал учета времени завершения кода асинхронный , получение данных даже стоит более 1 секунды .Иногда это будет стоить 4 или 5 секунд
kline-log Tue, 12 Mar 2019 01:51:03 ERROR Total time running [coinbell_bc_btc_15] redis update_aug: 1.5217051506 seconds
kline-log Tue, 12 Mar 2019 01:51:04 ERROR Total time running [coinbell_bc_btc_15] total kline aug: 1.61521601677 seconds
Мой конфиг redis kwargs
- это информация о подключении порт хоста и пароль .При использовании redis я получу тот же экземпляр redis по ключу от redis_pools
redis_max_connections = 4
def __redis_conn( **kwargs):
return redis.StrictRedis(connection_pool=redis.ConnectionPool(
max_connections=config.redis_max_connections,
# socket_timeout=config.redis_socket_timeout,
socket_keepalive=True,
**kwargs
))
def _init_redis(platform_name, key, **kwargs):
redis_pools['{}_{}'.format(platform_name, key)] = __redis_conn(**kwargs)
redis_pools = {}
for platform_name, setting in config.redis_config.iteritems():
for key, c in setting.iteritems():
_init_redis(platform_name.lower(), key.lower(), **c)
def _get_redis(platform_name, key):
return redis_pools['{}_{}'.format(platform_name.lower(), key.lower())]
save_redis = partial(_get_redis, key="save")
Мой синхронный код
def kline(mod_name='coinbell', run_key='kline_aug:1w', table_name=None):
while True:
with t_with_time('finish kline aug', logger.get('kline-log').error):
mod = util.import_mod("fibo_spider.kline_spiders.%s" % mod_name)
spider = mod.spiders
robot = spider(run_key)
for s in robot:
flag = True if table_name is None else s.table_name == table_name
if flag:
print s
with t_with_time('{} finish kline aug'.format(s.table_name), logger.get('kline-log').error):
s.run()
Мой асинхронный код Я использую gevent.pool
в многопроцессорном режиме для ускорения
@func_time(logger.get("kline-log").error, "pool")
def pool_func(func, func_list):
pool = Pool(50)
try:
for func_part in func_list:
pool.add(gevent.spawn(func, func_part))
pool.join()
except Exception as e:
raise
@func_time(logger.get("kline-log").error, "process")
def multi_process(func_part_list, process_num, run_func):
if func_part_list:
random.shuffle(func_part_list)
process = []
for func_part in util.split_list(func_part_list, process_num):
# 加快统计效率
p = Process(target=pool_func, args=(run_func, func_part))
process.append(p)
p.start()
for pro in process:
pro.join()
del process[:]
def run(tar):
tar.run()
def run_kline(program_name, func, key, module_list, process_num=8, run_func=run, wait=5):
while True:
try:
with t_with_time('update-yaml', logger.get('kline-log').error):
ParseYaml.update_coin_config(program_name)
with t_with_time('update-func', logger.get('kline-log').error):
func_par_list = func(key, module_list)
if func_par_list:
multi_process(func_par_list, process_num, run_func)
logger.get("kline-log").info("{} multi_process finish".format(key))
time.sleep(wait)
else:
time.sleep(4)
except Exception as e:
logger.get("kline-log").error(util.error_msg())
def get_kline_spider(key, module_list):
k = []
for name in module_list:
with util.reload_mod(IMPORT_PATH_FORMAT % name) as mod:
k_spider = getattr(mod, 'spiders', None)
if k_spider:
k.extend(k_spider(key))
return k
@cli.command()
@click.argument('program_name')
@click.argument('key')
def aug(program_name, key):
"""更新K线到redis(全量增量)"""
module_list = [program_name, ]
spider_key = "kline_aug:{}".format(key)
run_kline(program_name, get_kline_spider, spider_key, module_list, process_num=2, wait=1)
Важный код просто запустите функцию run
class KlineAug(BaseRedis):
"""更新K线到redis"""
max_k_period = config.max_k_period
# db = dao.kline()
def __init__(self, platform_name, table_name, key, field=None, parse_data_func=None, run_key=None):
super(KlineAug, self).__init__(platform_name, table_name, key, field, run_key=run_key)
self.parse_data_func = parse_data_func
self.key = util.redis_key(const.REDIS_KEY_PREFIX.KLINE, self.table_name[:self.table_name.rfind('_')])
self.redis = dao.save_redis(platform_name)
self.db = dao.kline(platform_name)
self.info_db = dao.info(platform_name)
self.stash = Stash('kline_{}_aug'.format(table_name))
self.ticker_last_tid_key = 'trade_last_tid'
self.update_kline_dt_flag = 'kline_update_dt'
def _update_aug(self):
key = self.src_key.replace('kline', 'trades')
with t_with_time('[%s] redis update_aug' % (self.table_name,), logger.get('kline-log').error):
trade = util.json_loads(util.decompress(self.src_redis.get(key)))
if trade:
last_tid = self.stash.get(self.ticker_last_tid_key)
new_trade = trade[0]
logger.get('kline-log').error('[{}] {} {}'.format(self.table_name, last_tid, new_trade))
return last_tid != new_trade['tid'], new_trade
return False, None
def run(self, params=None):
# data = self.src_redis.hget(self.src_key, self.src_field)
# rows = util.json_loads(util.decompress(data))
with t_with_time('[%s] total kline aug' % (self.table_name,), logger.get('kline-log').error):
update_dt = self.stash.get(self.update_kline_dt_flag)
if update_dt and not util.update_limit_dt(update_dt, config.kline_wait_sec):
logger.get('kline-log').error(
'[{}] {} {}'.format(self.table_name, update_dt, config.kline_wait_sec))
return
is_updated, new_trade = self._update_aug()
try:
if is_updated:
with t_with_time('[%s] kline aug get db' % (self.table_name,), logger.get('kline-log').error):
rows = QS(self.db).table(getattr(T, self.table_name)).order_by(
F.timestamp, desc=True).limit(0, self.max_k_period).select('*')
if rows:
# rows = self.parse_data(data[-self.max_k_period:])
df = pd.DataFrame(rows)
df.sort_values('timestamp', inplace=True)
# df = df.iloc[-self.max_k_period:]
df['timestamp'] = df['timestamp'].apply(lambda x: x * 1000)
self.insert_data(df)
logger.get('kline-log').error(
'[{}] {} {}'.format(self.table_name, rows[-1]['close'], new_trade['price']))
if new_trade and rows[-1]['close'] == util.safe_decimal(new_trade['price']):
self.stash[self.ticker_last_tid_key] = new_trade['tid']
self.stash[self.update_kline_dt_flag] = util.nowdt()
else:
self.none_data()
self.insert_info()
except Exception:
logger.get('kline-log').error(util.error_msg())
def insert_info(self):
if self.run_key:
db = self.info_db
# data_type, _ = self.src_key.split(':')
platform, symbola, symbolb, _ = self.table_name.split('_')
with transaction(db) as trans:
QS(db).table(T.collect_info).insert({
'type': self.run_key.lower(),
'symbol': '{}_{}'.format(symbola, symbolb).lower(),
'time': util.now(),
'platform': platform.lower(),
}, on_duplicate_key_update={
'time': util.now(),
})
trans.finish()
def insert_data(self, df):
last_key = str(self.src_field) + ":last"
minute = self.src_field
minute_data = util.compress(self._df2json(df))
last_key_data = self._df2json(df.iloc[-1])
with t_with_time('[%s] kline aug send redis' % (self.table_name,),
logger.get('kline-log').error):
self.redis.hmset(self.key, {
minute: minute_data,
last_key: last_key_data,
})
def _df2json(cls, df):
rows = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
return util.json_dumps(rows, default=str)
def parse_data(self, data):
return self.parse_data_func(data)
def none_data(self):
self.redis.delete(self.key)