Multiprocessing.apply на Python 3 - PullRequest
       8

Multiprocessing.apply на Python 3

0 голосов
/ 23 октября 2019

У меня есть функция, которая зацикливается, используя значения из словаря. Я хочу разделить свои ключи, так что я могу разбить мой на части, равные моему процессору. Моя функция:

def find_something2(new2, threl=2.0, my_limit=150, far=365):
""" Find stocks tha are worth buying"""
global current_date, total_money, min_date, current_name, dates_dict, mylist, min_date_sell, reduced_stocks
worthing = list()
for stock in new2:
    frame = reduced_stocks[stock]
    temp = frame.loc[current_date:end_date]
    if not temp.empty:
        mydate = temp.head(far).Low.idxmin()
        if mydate <= min_date_sell:
            my_min = temp.head(far).Low.min()
            if total_money >= my_min > 0:  # find the min date at four months
                ans, res, when_sell, total, income = worth_buy(stock, frame, mydate, 'Low',
                                                               thres=threl, sell_limit=my_limit)
                if ans:
                    if income > 3 * 10 ** 6:
                        worthing.append([mydate, stock, res, when_sell, total, income])
if current_date > '1990-01-01':
    return sorted(worthing, key=itemgetter(0))
elif current_date > '1985-01-01':
    return sorted(worthing, key=itemgetter(0))
else:
    answer = sorted(worthing, key=itemgetter(5), reverse=True)
    return answer[::11]

, поэтому я попробовал:

import multiprocessing as mp
result_list = []
def log_result(result):
    # This is called whenever foo_pool(i) returns a result.
    # result_list is modified only by the main process, not the pool workers.
    global result_list
    result_list.append(result)
def apply_async_with_callback():
    global reduced_stocks
    temp = list(reduced_stocks.keys())
    temp1 = temp[0:1991]
    temp2 = temp[1991:]
    temp = [temp1, temp2]
    pool = mp.Pool(2)
    for i in temp:
        pool.apply_async(find_something2, args=(i, 1.1, 2200, 1,), callback=log_result)
    pool.close()
    pool.join()
    print(result_list)

if __name__ == '__main__':
        apply_async_with_callback()

Это правильный путь?

Я также пробовал темы, но процессоридет максимум на 15%, используя 12 потоков (у меня 6 ядер Intel)

def pare():        
relist = list(reduced_stocks.keys())
sublist = [relist[x:x+332] for x in range(0, len(relist), 332)]  
data = [x  for x in sublist]
threads = list()
from threading import Thread
for i in range(12):
    process = Thread(target=find_something2, args=(1.4,2500,8,data[i],i,results))
    process.start()
    threads.append(process)
for process in threads:
    process.join()

1 Ответ

0 голосов
/ 23 октября 2019

Один из способов сделать многопроцессорную обработку - создать Pool и передать ему подготовленные данные. Дождитесь вычисления и обработайте результаты. Код подсказывает, как это сделать.

# setup the function so it gets everything from arguments
def find_something2(new2, threl, my_limit, far, current_date, total_money, min_date_sell, reduced_stocks, end_date):
    # ....
    pass

# prepare the data
# replace the a1, a2 ... with the actual parameters your function takes
data = [(a1, a2, a3, ...) for your_data in your_dict]

import multiprocessing as mp
with mp.Pool() as pool:
    results = pool.starmap(find_something2, data)

    print(results)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...