У меня много кадров с пандами. Я получаю информацию от них и затем возвращаю некоторую информацию. Поскольку функция, которую я использую, занимает много времени, около 5 секунд на цикл, и я делаю много циклов, я хочу ее парализовать.
Сначала я попробовал процесс из многопроцессорного модуля. Проблема заключалась в том, что у меня много данных только для чтения, хранящихся в памяти, и разные процессы не были хорошей идеей, так как использование другой памяти.
Поэтому я решил попробовать темы. Я хочу вызывать функцию много раз, когда вход разбит на части. Таким образом, вместо того, чтобы обращаться к приблизительно 10 000 фрейм данных за один раз, я пытаюсь получить доступ к 1 000 фрейм данных за вызов функции, но при этом запускаю 12 потоков.
Моя идея была:
def pare():
relist = list(reduced_stocks.keys())
sublist = [relist[x:x+332] for x in range(0, len(relist), 332)]
data = [(1.4, 2500, 8, x) for x in sublist]
data = [x for x in sublist]
threads = list()
from threading import Thread
for i in range(12):
process = Thread(target=find_something2, args=(1.4,2500,8,data[i],i,results))
process.start()
threads.append(process)
for process in threads:
process.join()
Но, как я понял, время было почти одинаковым как для параллельного, так и для последовательного выполнения. Буду ли я иметь лучшие результаты, используя пул потоков? Проблема, как я уже сказал, заключается в том, что функция, которую я пытаюсь запустить параллельно, должна возвращать список. Таким образом, в приведенном выше примере я сохраняю каждый результат из потока в глобальной переменной списка.
Я также добавляю функцию find_something и функцию worth_buy, которые выполняются. Я использую это для current_day> '2005-01-01', поэтому не обращайте внимания на другие ссылки.
def find_something2(threl=2.0, my_limit=150, far=365, mystocks=None,index=None,results=None):
""" Find stocks that are worth buying"""
global current_date, total_money, min_date, current_name, dates_dict, mylist, min_date_sell, reduced_stocks
worthing = list()
if current_date < '2005-01-01':
for stock in mylist:
if dates_dict[stock][0] <= min_date: # don't open all files
frame = open_txt(stock)
temp = frame.loc[current_date:end_date]
if not temp.empty:
mydate = temp.head(far).Low.idxmin()
my_min = temp.head(far).Low.min()
if total_money >= my_min > 0: # find the min date at four months
ans, res, when_sell, total, income = worth_buy(stock, frame, mydate, 'Low',
thres=threl,
sell_limit=my_limit)
if ans:
if current_date > '2015-01-01':
if total >= 1000:
worthing.append([mydate, stock, res, when_sell, total, income])
elif current_date > '2000-01-01':
if income > 3 * 10 ** 6:
worthing.append([mydate, stock, res, when_sell, total, income])
elif current_date > '1985-01-01':
if income > 1.5 * 10 ** 6:
worthing.append([mydate, stock, res, when_sell, total, income])
else:
worthing.append([mydate, stock, res, when_sell, total, income])
else:
for stock in mystocks:
frame = reduced_stocks[stock]
temp = frame.loc[current_date:end_date]
if not temp.empty:
mydate = temp.head(far).Low.idxmin()
my_min = temp.head(far).Low.min()
if total_money >= my_min > 0: # find the min date at four months
ans, res, when_sell, total, income = worth_buy(stock, frame, mydate, 'Low',
thres=threl, sell_limit=my_limit)
if ans:
if income > 3 * 10 ** 6:
worthing.append([mydate, stock, res, when_sell, total, income])
if current_date > '1990-01-01':
results[index] = sorted(worthing, key=itemgetter(0))
elif current_date > '1985-01-01':
return sorted(worthing, key=itemgetter(0))
else:
answer = sorted(worthing, key=itemgetter(5), reverse=True)
return answer[::11]
def worth_buy(stock, frame, date, code, thres=2.0, sell_limit=0):
""" Checks if the stock is worth buying"""
global keep_time, end_date, total_money
over = find_limit(date, sell_limit)
checking = frame.loc[date:over]
if checking.empty:
return False, 0, '', 0, 0 # anything at date limits
if stock not in purchased:
when_sell = ((checking.High / frame.at[date, code]).idxmax())
else:
possible_dates = purchased[stock][3].split(sep='/')
other_sell = find_limit(possible_dates[-1], 1)
checking2 = checking.loc[other_sell:]
if checking2.empty:
return False, 0, '', 0, 0 # anything at date limits
when_sell = (checking2.High / frame.at[date, code]).idxmax()
buy_value = frame.at[date, code]
sell_value = frame.at[when_sell, code]
total = buy_total(frame, date, code, when_sell)
ans = sell_value / buy_value # mporw na exw sunartisi
income = (sell_value - buy_value) * total
if when_sell <= end_date:
return ans >= thres, ans, when_sell, total, income
else:
return False, 0, '', 0, 0
Есть ли шанс, что время не улучшится, потому что большая часть времени тратится на чтение данных вместо их обработки? ? Я думаю, но не уверен, что, поскольку я просто читаю, а не пишу, у меня должен быть лучший результат при использовании многопоточности.
Еще одна важная вещь заключается в том, что функция find_something и другие не обновляют глобальные значения, все на этом этапе доступно только для чтения. Я сохранил в памяти все фреймы данных, к которым у меня есть доступ по их имени, которое я сохранил в словаре.
Заранее спасибо
Отредактируйте, как я храню фреймы данных в словаре:
def initialize():
global current_date, file_names, current_name, dates_dict, mylist, sell_when, min_date_sell
worthing = 1
for stock in file_names:
if getsize('C:/Users/tzagk/Downloads/Stocks/' + stock) > 0:
frame = pd.read_csv('C:/Users/tzagk/Downloads/Stocks/' + stock, dtype=size_dict, usecols=use_cols, header=0,
index_col=0)
dates_dict[stock] = [frame.index[0], frame] # store start_time, data frame
mylist.append(stock)
if frame.index[0] <= min_date: #
check_date = frame.head(365).Low.idxmin()
if frame.head(365).Low.min() <= 1: # find the min date at four months
ans, res, when_sell, total, income = worth_buy(stock, frame, check_date,
code='Low', thres=1.2,
sell_limit=3000)
if ans:
if res > worthing: # check best worth
worthing = res
current_date = check_date # set start date
current_name = stock
min_date_sell = when_sell