Многопроцессорность Python, функции с аргументами - PullRequest
0 голосов
/ 01 июня 2019

У меня есть программа, которая имитирует весь бейсбольный сезон, но выполняет много вычислений за игру, поэтому каждая игра занимает около 30 секунд.С 2430 играми в сезоне, программа занимает около 20 часов в сезон.Очевидно, я хотел бы ускорить это, поэтому самое непосредственное решение похоже на многопроцессорность.Я мог бы вручную разбить его на группы по ~ 600 и запустить четыре процесса, но я хотел бы выяснить, как работает модуль многопроцессорной обработки.

Вот что я пробовал до сих пор, но, очевидно, это не так.t работа.

def test_func():
    algorithm_selection = 1

    # Create sqlite database connection
    conn = sqlite3.connect('C:/F5 Prediction Engine/sqlite3/Version 2/statcast_db.db')
    c = conn.cursor()

    season = input('Year to simulate: ')
    c.execute('SELECT * FROM gamelogs_' + season)
    season_games = c.fetchall()

    game_num = 0
    for game in season_games:

        game_num = game_num + 1
        #Get away lineup in terms of MLB IDs
        away_lineup = ConvertLineup(game[105], game[108], game[111], game[114], game[117], game[120], game[123], game[126], game[129])
        #Get home lineup in terms of MLB IDs
        home_lineup = ConvertLineup(game[132], game[135], game[138], game[141], game[144], game[147], game[150], game[153], game[156])
        #Get away starting pitcher and hand in terms of MLB ID
        away_pitcher_results = GetPitcherIDandHand(game[101])
        away_pitcher_id = away_pitcher_results[0][0]
        away_pitcher_hand = away_pitcher_results[0][1]
        #Get home starting pitcher and hand in terms of MLB ID
        home_pitcher_results = GetPitcherIDandHand(game[103])
        home_pitcher_id = home_pitcher_results[0][0]
        home_pitcher_hand = home_pitcher_results[0][1]
        #Get the date of the game
        today_date = game[0]

        if algorithm_selection == 1:
            #Check if the current game has already been evaluated and entered into the database
            c.execute('SELECT * FROM pemstein_results_' + season + ' WHERE date = "' + game[0] + '" AND away_team = "' + game[3] + '" AND home_team = "' + game[6] + \
                  '" AND away_team_score = "' + game[9] + '" AND home_team_score = "' + game[10] + '"')
            check_results = c.fetchall()
            if len(check_results) == 0:
                exp_slgs = PemsteinSimulation(home_pitcher_id, away_pitcher_id, season, home_pitcher_hand, away_pitcher_hand, home_lineup, away_lineup, game[0])
                if exp_slgs[2] == 0: #if both pitches had at least 300 PAs to use for simulation
                    c.execute([long string to insert results into database])
                conn.commit()
                print('Game ' + str(game_num) + ' finished.')
                if exp_slgs[2] == 1: #if one of the pitches did not have enough PAs to qualify
                    c.execute([long string to insert results into database])
                    conn.commit()
                    print('Game ' + str(game_num) + ' finished.')
            if len(check_results) > 0:
                print('Game ' + str(game_num) + ' has already been evaluated.')

from multiprocessing import Process
import os

processes = []

for i in range(0, os.cpu_count()):
    print('Registering process %d' % i)
    processes.append(Process(target=test))

for process in processes:
    process.start()

for process in processes:
    process.join()

==================

Редактировать: новый код

#Child Process
def simulate_games(games_list, counter, lock):
    while(1):
        # Create sqlite database connection
        conn = sqlite3.connect('C:/F5 Prediction Engine/sqlite3/Version 2/statcast_db.db')
        c = conn.cursor()

        #acquire the lock which grants access to the shared variable
        with lock:

            #check the termination condition
            if counter >= len(games_list):
                break

            #get the game_num and game to simulate
            game_num = counter.value
            game_to_simulate = game_list[counter.value]

            #update the counter for the next process
            counter.value += 1

        #Do simulation
        game_num = 0


        game_num = game_num + 1
        #Get away lineup in terms of MLB IDs
        away_lineup = ConvertLineup(game_to_simulate[105], game_to_simulate[108], game_to_simulate[111], game_to_simulate[114], game_to_simulate[117], game_to_simulate[120], game_to_simulate[123], game_to_simulate[126], game_to_simulate[129])
        #Get home lineup in terms of MLB IDs
        home_lineup = ConvertLineup(game_to_simulate[132], game_to_simulate[135], game_to_simulate[138], game_to_simulate[141], game_to_simulate[144], game_to_simulate[147], game_to_simulate[150], game_to_simulate[153], game_to_simulate[156])
        #Get away starting pitcher and hand in terms of MLB ID
        away_pitcher_results = GetPitcherIDandHand(game[101])
        away_pitcher_id = away_pitcher_results[0][0]
        away_pitcher_hand = away_pitcher_results[0][1]
        #Get home starting pitcher and hand in terms of MLB ID
        home_pitcher_results = GetPitcherIDandHand(game[103])
        home_pitcher_id = home_pitcher_results[0][0]
        home_pitcher_hand = home_pitcher_results[0][1]
        #Get the date of the game
        today_date = game_to_simulate[0]
        if algorithm_selection == 1:
            #Check if the current game has already been evaluated and entered into the database
            c.execute('SELECT * FROM pemstein_results_' + season + ' WHERE date = "' + game_to_simulate[0] + '" AND away_team = "' + game_to_simulate[3] + '" AND home_team = "' + game_to_simulate[6] + \
                      '" AND away_team_score = "' + game_to_simulate[9] + '" AND home_team_score = "' + game_to_simulate[10] + '"')
            check_results = c.fetchall()
            if len(check_results) == 0:
                exp_slgs = PemsteinSimulation(home_pitcher_id, away_pitcher_id, season, home_pitcher_hand, away_pitcher_hand, home_lineup, away_lineup, game_to_simulate[0])
                if exp_slgs[2] == 0: #if both pitches had at least 300 PAs to use for simulation
                    c.execute('long sql')
                    conn.commit()
                    print('Game ' + str(game_num) + ' finished.')
                if exp_slgs[2] == 1: #if one of the pitches did not have enough PAs to qualify
                    c.execute('long sql')
                    conn.commit()
                    print('Game ' + str(game_num) + ' finished.')
            if len(check_results) > 0:
                print('Game ' + str(game_num) + ' has already been evaluated.')


if __name__ == "__main__":
    # Create sqlite database connection
    conn = sqlite3.connect('C:/F5 Prediction Engine/sqlite3/Version 2/statcast_db.db')
    c = conn.cursor()

    #Query all games for season to be simulated
    season = int(input('Year to simulate: '))
    c.execute('SELECT * FROM gamelogs_' + str(season))
    season_games = c.fetchall()

    algorithmSelection = 1 

    if algorithmSelection == 1:
        PemsteinSQLresults(str(season))

    counter = mp.Value('i', 0)
    lock = mp.Lock()
    children = []
    for i in range(os.cpu_count()):
        children.append(mp.Process(target=simulate_games, args=(season_games, counter, lock)))

    for child in children:
        child.start()

    for child in children:
        child.join()

Ошибка:

Traceback (most recent call last):
  File "C:\F5 Prediction Engine\Version 2\SimulateSeason v2.py", line 126, in <module>
    child.start()
  File "C:\Python\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Python\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
BrokenPipeError: [Errno 32] Broken pipe

=============

Так что я пошел на этот сайт , чтобы просмотреть некоторые вещи, и попробовал новыйскрипт со следующим кодом, который я скопировал с сайта:

import mp

def worker(num):
    """thread worker function"""
    print('Worker:' + num)
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = mp.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

Но он также ничего не делает.На сайте написано, что он должен печатать Worker:0 Worker:1 и т. Д., Но я не получаю отпечатки.Возможно ли, что на моей машине что-то не так?

1 Ответ

1 голос
/ 01 июня 2019

Мне кажется, что вы просто пытались создать новый процесс для каждого процессора и заставить их запускать ту же функцию, что вы написали сначала, однако, если вы хотите работать с процессами, вам придется адаптировать его и обрабатывать процесс Synchonization.

В качестве примера у вас может быть мастер-процесс, который запрашивает у пользователя сезонный год, извлекает все игры за этот год, а затем дочерние процессы читают из полученного массива. Смотрите следующий пример:

# Parent Process
import multiprocessing as mp

# establish db connection [ ... ]

season = int(input("Year to simulate: "))
c.execute('SELECT * FROM gamelogs_' + season)
season_games = c.fetchall()

counter = mp.Value("i", 0)
lock = mp.Lock()
children = []
for i in range(os.cpu_count()):
    children.append(mp.Process(target=simulate_games, args=(season_games, counter, lock,)))

for child in children:
    child.start()

for child in children:
    child.join()

# Child Process
def simulate_games(games_list, counter, lock):
    while(1):
        # acquire the lock which grants the access to the shared variable
        with lock:

            # check the termination condition
            if counter.value >= len(games_list):
                break

            # get the game_num and the game to simulate
            game_num = counter.value
            game_to_simulate = games_list[counter.value]

            # update counter for the next process
            counter.value += 1

        #  Do simulation here

То, что мы имеем выше, это родительский процесс, который в основном подготавливает некоторые данные и создает новые дочерние процессы.

Счетчик реализован с помощью специального класса, т.е. Значение , которое используется для разделения скалярных значений между процессами; Lock по сути является мьютексом, который мы используем для синхронизации доступа к переменной counter и предотвращения одновременного доступа: обратите внимание, что вы могли бы использовать Lock, который автоматически создается внутри разделяемой переменной counter, но я подумал было бы легче понять, разделив их.

Дочерние процессы сначала получают блокировку, читают значение счетчика и увеличивают его, затем переходят к своему обычному поведению, имитируя игры

...