Я пытаюсь понять, почему моя программа постоянно наращивает использование памяти.Я не продвинутый программист, как вы можете видеть по коду ниже, ха-ха, но я думаю, что я сузил его до списков, использующих память.Я попытался отладить с помощью профилировщика и распечатки памяти, но вижу только, что общее использование увеличивается.
В основном программа собирает данные из криптовалютного обмена и записывает данные в базу данных MySQL.
Не уверен, стоит ли мне классифицировать это как утечку памяти или управление памятью.Любая помощь с благодарностью.
"""
@profile
"""
import datetime
import os
import sys
import ccxt
from time import sleep
import mysql.connector
from mysql.connector import errorcode
import pandas as pd
from pympler import muppy
from pympler import summary
from memory_profiler import profile
root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(root)
msec = 1000
minute = 60 * msec
hold = 30
exchange_list = ['gemini']
ticker_exchange_list = ['gemini']
gemini_currency = ['LTC/USD']
cnx = mysql.connector.connect(user = 'brad', password = 'beerisgood', host = '127.0.0.1')
DB_NAME = 'gemini'
TABLES = {}
TBL_NAME = '`LTC/USD`'
TABLES[TBL_NAME] = ("CREATE TABLE "+TBL_NAME+" (id INT AUTO_INCREMENT PRIMARY KEY, trade_time_ms BIGINT(32), orderbook_time_ms VARCHAR(55), opening FLOAT, high FLOAT, close FLOAT, low FLOAT, ltc_traded_volume FLOAT, bid FLOAT, ask FLOAT, bid_volume FLOAT, ask_volume FLOAT);")
cursor = cnx.cursor(buffered = True)
@profile()
def create_database(cursor):
try:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME))
except mysql.connector.Error as err:
print("Failed creating database: {}".format(err))
exit(1)
try:
cursor.execute("USE {}".format(DB_NAME))
except mysql.connector.Error as err:
print("Database {} does not exists.".format(DB_NAME))
if err.errno == errorcode.ER_BAD_DB_ERROR:
create_database(cursor)
print("Database {} created successfully.".format(DB_NAME))
cnx.database = DB_NAME
else:
print(err)
exit(1)
for table_name in TABLES:
table_description = TABLES[table_name]
try:
print("Creating table {}: ".format(table_name), end='')
cursor.execute(table_description)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
print("already exists.")
else:
print(err.msg)
else:
print("OK")
cnx.close()
cnx = mysql.connector.connect(user = 'brad', password = 'beerisgood', host = '127.0.0.1', database = DB_NAME)
cursor = cnx.cursor(buffered = True)
gemini = ccxt.gemini({
'rateLimit': 3000,
'enableRateLimit': True,
'exchangeName': "gemini",
'database': cnx
})
@profile()
def ticker(exchange, currency):
try:
ticker = exchange.fetch_ticker(currency)
order_book = exchange.fetch_order_book(currency, limit = 1)
order_book_timestamp = exchange.last_response_headers['Date']
print(order_book['bids'][0][1],' ',float(order_book['bids'][0][0]),' || ',float(order_book['asks'][0][0]),' ',order_book['asks'][0][1])
ob_timestamp = pd.Timestamp(order_book_timestamp)
closing = ticker['last']
trade_timestamp = ticker['timestamp']
ob_timestamp = ob_timestamp
open_ = ticker['open']
high = ticker['high']
low = ticker['low']
ltc_traded_volume = ticker['baseVolume']
bid = ticker['bid']
ask = ticker['ask']
bid_volume = float(order_book['bids'][0][1])
ask_volume = float(order_book['asks'][0][1])
if not open_:
open_ = 0
if not high:
high = 0
if not low:
low = 0
if not ltc_traded_volume:
ltc_traded_volume = 0
if not bid:
bid = 0
if not ask:
ask = 0
delta_value = delta(exchange, currency)
price_delta_1h = delta_value[0]
price_delta_24h = delta_value[1]
print(str(exchange.exchangeName) + " " + str(currency) + " Delta 1h: " + str(price_delta_1h))
print(str(exchange.exchangeName) + " " + str(currency) + " Delta 24h: " + str(price_delta_24h))
trading_pairs = {
"trading_pair_id": id_convert(exchange, currency),
"trading_pair": currency,
"price": float(closing),
"price_delta_1h": price_delta_1h,
"price_delta_24h": price_delta_24h,
"price_updated_at": trade_timestamp
}
database_write(exchange, currency, trade_timestamp, ob_timestamp, open_, high, closing, low, ltc_traded_volume, bid, ask, bid_volume, ask_volume)
sleep(exchange.rateLimit / 1000)
except (ccxt.ExchangeError, ccxt.AuthenticationError, ccxt.ExchangeNotAvailable, ccxt.RequestTimeout) as error:
print('Got an error', type(error).__name__, error.args, ', retrying in', hold, 'seconds...')
sleep(exchange.rateLimit / 1000)
return trading_pairs
@profile()
def ticker_update(exchange, trading_pairs):
firebase_pairs = []
try:
for x in trading_pairs:
try:
data = ticker(exchange, x)
print("writing data for " + str(x))
firebase_pairs.append(data)
except Exception as error:
print(str(error))
sleep(exchange.rateLimit / 1000)
continue
except Exception as error:
print("error " + str(exchange) + " in ticker fetching. Error: " + str(error))
del(firebase_pairs)
@profile()
def id_convert(exchange, pair):
id = str(exchange.exchangeName) + '_' + str(pair).replace("/", "_").lower()
return id
@profile()
def database_write(exchange, TBL_NAME, trade_timestamp, ob_timestamp, opening, high, close, low, ltc_traded_volume, bid, ask, bid_volume, ask_volume):
try:
add_data = ("INSERT INTO `LTC/USD` "
"(trade_time_ms, orderbook_time_ms, opening, high, close, low, ltc_traded_volume, bid, ask, bid_volume, ask_volume) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)")
data_add = (trade_timestamp, ob_timestamp, opening, high, close, low, ltc_traded_volume, bid, ask, bid_volume, ask_volume)
cursor.execute(add_data, data_add)
cnx.commit()
except Exception as error:
print("error in database_write function " + str(error))
@profile()
def ticker_delta(exchange, currency):
try:
exchange.database.begin()
table = exchange.database[currency]
cursor.execute("SELECT * FROM "+TBL_NAME)
ticker_length = cursor.rowcount
closing_nowq = "SELECT close FROM "+TBL_NAME+" WHERE id = "+str(ticker_length)
cursor.execute(closing_nowq)
closing_now = cursor.fetchone()
closing_now = float(closing_now[0])
if ticker_length > 10:
time_between_tickers_min = (float(table.find_one(id='10')['trade_time_ms']) - float(
table.find_one(id='1')['trade_time_ms'])) / 10 / 60000
delta1h_ticker_count = int(time_between_tickers_min * 60)
delta24h_ticker_count = int(time_between_tickers_min * 60 * 24)
last_ticker = table.find_one(id=str(ticker_length))['close']
else:
delta1h_ticker_count = 0
delta24h_ticker_count = 0
last_ticker = 1
print ("delta1h ticker count " + str(delta1h_ticker_count))
print ("delta24h ticker count " + str(delta24h_ticker_count))
if 0 < delta24h_ticker_count < ticker_length:
delta24h_ticker = int(table.find_one(id=str(ticker_length - delta24h_ticker_count))['close'])
delta24h = ((last_ticker - delta24h_ticker) / last_ticker) * 100
else:
delta24h = None
if 0 < delta1h_ticker_count < ticker_length:
delta1h_ticker = int(table.find_one(id=str(ticker_length - delta1h_ticker_count))['close'])
delta1h = ((last_ticker - delta1h_ticker) / last_ticker) * 100
else:
delta1h = None
print (str(currency) + " delta 1h: " + str(delta1h))
print (str(currency) + " delta 24h: " + str(delta24h))
except Exception as error:
print (error)
delta24h = None
delta1h = None
return (delta1h, delta24h)
@profile()
def delta(exchange, currency):
try:
delta_1h = int()
delta_24h = int()
cursor.execute("SELECT * FROM "+TBL_NAME+" ORDER BY ID DESC LIMIT 1")
table_length = cursor.rowcount
date_1h = datetime.datetime.utcnow()
date_24h = datetime.datetime.utcnow() + datetime.timedelta(-1)
timestamp_1h = exchange.parse8601(str(date_1h)) - 3600000
timestamp_24h = exchange.parse8601(str(date_24h))
result_1hq = "SELECT * FROM "+TBL_NAME+" WHERE trade_time_ms > "+str(timestamp_1h)
cursor.execute(result_1hq)
result_1h = cursor.fetchone()
result_1h = float(result_1h[0])
result_24hq = "SELECT * FROM "+TBL_NAME+" WHERE trade_time_ms > "+str(timestamp_24h)
cursor.execute(result_24hq)
result_24h = cursor.fetchone()
result_24h = float(result_24h[0])
closing_nowq = "SELECT close FROM "+TBL_NAME+" WHERE id = "+str(table_length)
cursor.execute(closing_nowq)
closing_now = cursor.fetchone()
closing_now = float(closing_now[0])
if not result_1h:
print("none value for result_1h")
delta_1h = 0
else:
closing_1h = result_1h
delta_1h = ((closing_now - closing_1h) / closing_now) * 100
if not result_24h:
print("none value for result_24h")
delta_24h = 0
else:
closing_24h = result_24h
delta_24h = ((closing_now - closing_24h) / closing_now) * 100
exchange.database.commit()
except Exception as error:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
print("Error in Delta function: " + str(error))
return (delta_1h, delta_24h)
@profile()
def update_all():
while (True):
try:
ticker_update(gemini, gemini_currency)
all_objects = muppy.get_objects()
sum1 = summary.summarize(all_objects)
summary.print_(sum1)
except (KeyboardInterrupt, SystemExit):
raise
update_all()