Использование потоков Python для доступа к большому количеству данных - PullRequest
0 голосов
/ 24 октября 2019

У меня много кадров с пандами. Я получаю информацию от них и затем возвращаю некоторую информацию. Поскольку функция, которую я использую, занимает много времени, около 5 секунд на цикл, и я делаю много циклов, я хочу ее парализовать.

Сначала я попробовал процесс из многопроцессорного модуля. Проблема заключалась в том, что у меня много данных только для чтения, хранящихся в памяти, и разные процессы не были хорошей идеей, так как использование другой памяти.

Поэтому я решил попробовать темы. Я хочу вызывать функцию много раз, когда вход разбит на части. Таким образом, вместо того, чтобы обращаться к приблизительно 10 000 фрейм данных за один раз, я пытаюсь получить доступ к 1 000 фрейм данных за вызов функции, но при этом запускаю 12 потоков.

Моя идея была:

def pare():        
relist = list(reduced_stocks.keys())
sublist = [relist[x:x+332] for x in range(0, len(relist), 332)]  
data = [(1.4, 2500, 8, x)  for x in sublist]
data = [x  for x in sublist]
threads = list()
from threading import Thread
for i in range(12):
    process = Thread(target=find_something2, args=(1.4,2500,8,data[i],i,results))
    process.start()
    threads.append(process)
for process in threads:
    process.join()

Но, как я понял, время было почти одинаковым как для параллельного, так и для последовательного выполнения. Буду ли я иметь лучшие результаты, используя пул потоков? Проблема, как я уже сказал, заключается в том, что функция, которую я пытаюсь запустить параллельно, должна возвращать список. Таким образом, в приведенном выше примере я сохраняю каждый результат из потока в глобальной переменной списка.

Я также добавляю функцию find_something и функцию worth_buy, которые выполняются. Я использую это для current_day> '2005-01-01', поэтому не обращайте внимания на другие ссылки.

def find_something2(threl=2.0, my_limit=150, far=365, mystocks=None,index=None,results=None):
""" Find stocks that are worth buying"""
global current_date, total_money, min_date, current_name, dates_dict, mylist, min_date_sell, reduced_stocks
worthing = list()
if current_date < '2005-01-01':
    for stock in mylist:
        if dates_dict[stock][0] <= min_date:  # don't open all files
            frame = open_txt(stock)
            temp = frame.loc[current_date:end_date]
            if not temp.empty:
                mydate = temp.head(far).Low.idxmin()
                my_min = temp.head(far).Low.min()
                if total_money >= my_min > 0:  # find the min date at four months
                    ans, res, when_sell, total, income = worth_buy(stock, frame, mydate, 'Low',
                                                                   thres=threl,
                                                                   sell_limit=my_limit)
                    if ans:
                        if current_date > '2015-01-01':
                            if total >= 1000:
                                worthing.append([mydate, stock, res, when_sell, total, income])
                        elif current_date > '2000-01-01':
                            if income > 3 * 10 ** 6:
                                worthing.append([mydate, stock, res, when_sell, total, income])
                        elif current_date > '1985-01-01':
                            if income > 1.5 * 10 ** 6:
                                worthing.append([mydate, stock, res, when_sell, total, income])
                        else:
                            worthing.append([mydate, stock, res, when_sell, total, income])
else:
    for stock in mystocks:
        frame = reduced_stocks[stock]
        temp = frame.loc[current_date:end_date]
        if not temp.empty:
            mydate = temp.head(far).Low.idxmin()
            my_min = temp.head(far).Low.min()
            if total_money >= my_min > 0:  # find the min date at four months
                ans, res, when_sell, total, income = worth_buy(stock, frame, mydate, 'Low',
                                                               thres=threl, sell_limit=my_limit)
                if ans:
                    if income > 3 * 10 ** 6:
                        worthing.append([mydate, stock, res, when_sell, total, income])
if current_date > '1990-01-01':
    results[index] = sorted(worthing, key=itemgetter(0))
elif current_date > '1985-01-01':
    return sorted(worthing, key=itemgetter(0))
else:
    answer = sorted(worthing, key=itemgetter(5), reverse=True)
    return answer[::11]

def worth_buy(stock, frame, date, code, thres=2.0, sell_limit=0):
""" Checks if the stock is worth buying"""
global keep_time, end_date, total_money
over = find_limit(date, sell_limit)
checking = frame.loc[date:over]
if checking.empty:
    return False, 0, '', 0, 0  # anything at date limits
if stock not in purchased:
    when_sell = ((checking.High / frame.at[date, code]).idxmax())
else:
    possible_dates = purchased[stock][3].split(sep='/')
    other_sell = find_limit(possible_dates[-1], 1)
    checking2 = checking.loc[other_sell:]
    if checking2.empty:
        return False, 0, '', 0, 0  # anything at date limits
    when_sell = (checking2.High / frame.at[date, code]).idxmax()
buy_value = frame.at[date, code]
sell_value = frame.at[when_sell, code]
total = buy_total(frame, date, code, when_sell)
ans = sell_value / buy_value  # mporw na exw sunartisi
income = (sell_value - buy_value) * total
if when_sell <= end_date:
    return ans >= thres, ans, when_sell, total, income
else:
    return False, 0, '', 0, 0

Есть ли шанс, что время не улучшится, потому что большая часть времени тратится на чтение данных вместо их обработки? ? Я думаю, но не уверен, что, поскольку я просто читаю, а не пишу, у меня должен быть лучший результат при использовании многопоточности.

Еще одна важная вещь заключается в том, что функция find_something и другие не обновляют глобальные значения, все на этом этапе доступно только для чтения. Я сохранил в памяти все фреймы данных, к которым у меня есть доступ по их имени, которое я сохранил в словаре.

Заранее спасибо

Отредактируйте, как я храню фреймы данных в словаре:

def initialize():
global current_date, file_names, current_name, dates_dict, mylist, sell_when, min_date_sell
worthing = 1
for stock in file_names:
    if getsize('C:/Users/tzagk/Downloads/Stocks/' + stock) > 0:
        frame = pd.read_csv('C:/Users/tzagk/Downloads/Stocks/' + stock, dtype=size_dict, usecols=use_cols, header=0,
                            index_col=0)
        dates_dict[stock] = [frame.index[0], frame]  # store start_time, data frame
        mylist.append(stock)
        if frame.index[0] <= min_date:  #
            check_date = frame.head(365).Low.idxmin()
            if frame.head(365).Low.min() <= 1:  # find the min date at four months
                ans, res, when_sell, total, income = worth_buy(stock, frame, check_date,
                                                               code='Low', thres=1.2,
                                                               sell_limit=3000)
                if ans:
                    if res > worthing:  # check best worth
                        worthing = res
                        current_date = check_date  # set start date
                        current_name = stock
                        min_date_sell = when_sell
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...