Python: Joblib для многопроцессорной обработки - PullRequest
1 голос
/ 07 августа 2020

Итак, у меня есть следующие функции:

def make_event_df(match_id, path):
    '''
    Function for making event dataframe.
    
    Argument:
        match_id -- int, the required match id for which event data will be constructed.
        path -- str, path to .json file containing event data.
    
    Returns:
        df -- pandas dataframe, the event dataframe for the particular match.
    '''
    ## read in the json file
    event_json = json.load(open(path, encoding='utf-8'))
    
    ## normalize the json data
    df = json_normalize(event_json, sep='_')
    
    return df

def full_season_events(comp_name, match_df, match_ids, path):
    '''
    Function to make event dataframe for a full season.
    
    Arguments:
        comp_name -- str, competition name + season name
        match_df -- pandas dataframe, containing match-data
        match_id -- list, list of match id.
        path -- str, path to directory where .json file is listed.
                e.g. '../input/Statsbomb/data/events'
    
    Returns:
        event_df -- pandas dataframe, containing event data for the whole season.
    '''
    ## init an empty dataframe
    event_df = pd.DataFrame()

    for match_id in tqdm(match_ids, desc=f'Making Event Data For {comp_name}'):
        ## .json file
        temp_path = path + f'/{match_id}.json'

        temp_df = make_event_df(match_id, temp_path)
        event_df = pd.concat([event_df, temp_df], sort=True)
        
    return event_df   

Теперь я запускаю этот фрагмент кода, чтобы получить фрейм данных:

comp_id = 11
season_id = 1
path = f'../input/Statsbomb/data/matches/{comp_id}/{season_id}.json'

match_df = get_matches(comp_id, season_id, path)

comp_name = match_df['competition_name'].unique()[0] + '-' + match_df['season_name'].unique()[0]
match_ids = list(match_df['match_id'].unique())
path = f'../input/Statsbomb/data/events'

event_df = full_season_events(comp_name, match_df, match_ids, path)

Приведенный выше фрагмент кода дает мне этот вывод :

Making Event Data For La Liga-2017/2018: 100%|██████████| 36/36 [00:29<00:00,  1.20it/s]

Как я могу использовать многопроцессорность, чтобы ускорить процесс, т.е. как я могу использовать match_ids в full_season_events(), чтобы быстрее получать данные из файла JSON (используя многопроцессорность). Я очень новичок в концепции joblib и многопроцессорной обработки. Может ли кто-нибудь сказать, какие изменения мне нужно внести в эти функции, чтобы получить требуемый результат?

1 Ответ

2 голосов
/ 07 августа 2020

Вам здесь не нужен joblib, просто подойдет multiprocessing.

  • Я использую imap_unordered, так как он быстрее, чем imap или map, но не сохраняет порядок (каждый работник может получать и отправлять задания вне очереди). Несоблюдение порядка, похоже, не имеет значения, так как вы все равно sort=True.
    • Поскольку я использую imap_unordered, требуется дополнительная jobs доработка; нет istarmap_unordered, который бы распаковывал параметры, поэтому нам нужно сделать это самостоятельно.
  • Если у вас много match_ids, все можно ускорить, например, с chunksize=10 до imap_unordered; это означает, что каждый рабочий процесс будет получать по 10 заданий за раз, и они также будут возвращать 10 заданий за раз. Это быстрее, поскольку на синхронизацию и сериализацию процессов тратится меньше времени, но, с другой стороны, индикатор выполнения TQDM будет обновляться реже.

Как обычно, приведенный ниже код имеет кодировку dry и может не работает OOTB.

import multiprocessing


def make_event_df(job):
    # Unpack parameters from job tuple
    match_id, path = job
    with open(path) as f:
        event_json = json.load(f)
    # Return the match id (if required) and the result.
    return (match_id, json_normalize(event_json, sep="_"))


def full_season_events(comp_name, match_df, match_ids, path):
    event_df = pd.DataFrame()

    with multiprocessing.Pool() as p:
        # Generate job tuples
        jobs = [(match_id, path + f"/{match_id}.json") for match_id in match_ids]
        # Run & get results from multiprocessing generator
        for match_id, temp_df in tqdm(
            p.imap_unordered(make_event_df, jobs),
            total=len(jobs),
            desc=f"Making Event Data For {comp_name}",
        ):
            event_df = pd.concat([event_df, temp_df], sort=True)

    return event_df
...