Нужна помощь для повышения pandas эффективности группового - PullRequest
0 голосов
/ 20 апреля 2020

Я работаю с большим набором данных, который расширяется> 50 лет. Каждый год имеет ~ 15 миллионов строк записей с несколькими переменными / столбцами. Мне нужно выполнять групповые операции по месту и времени. Мой код работает очень медленно - для обработки данных за год требуется ~ 5 часов. Я просмотрел несколько постов по многопроцессорности, но, поскольку у меня нет опыта работы с ним, я не уверен, применим ли этот метод к моей проблеме. Я был бы признателен, если бы кто-то мог указать, как я могу сделать код более эффективным.

Вот мой код:

#!/usr/bin/env python
# encoding: utf-8
import numpy as np
import pandas as pd
import datetime
import argparse
from scipy.stats.mstats import hmean

def Nstat(df):
    duMW = [6,7,8,9,30,31,32,33,34,35,98]
    d = {}
    d['NSTN'] = df['STATION'].nunique()
    d['NVIS'] = df['VIS'].count()
    d['NMW']  = df['MW'].count()
    d['NPW']  = df['PW'].count()
    d['NDU']  = df.loc[ isd['RH']<=90,'MW'].isin(duMW).sum()
    d['NDU6']  = (df.loc[ isd['RH']<=90,'MW']==6 ).sum()
    d['NDU7']  = (df.loc[ isd['RH']<=90,'MW']==7 ).sum()
    d['NDU8']  = (df.loc[ isd['RH']<=90,'MW']==8 ).sum()
    d['NDU9']  = (df.loc[ isd['RH']<=90,'MW']==9 ).sum()
    d['NDU30'] = (df.loc[ isd['RH']<=90,'MW']==30).sum()
    d['NDU31'] = (df.loc[ isd['RH']<=90,'MW']==31).sum()
    d['NDU32'] = (df.loc[ isd['RH']<=90,'MW']==32).sum()
    d['NDU33'] = (df.loc[ isd['RH']<=90,'MW']==33).sum()
    d['NDU34'] = (df.loc[ isd['RH']<=90,'MW']==34).sum()
    d['NDU35'] = (df.loc[ isd['RH']<=90,'MW']==35).sum()
    d['NDU98'] = (df.loc[ isd['RH']<=90,'MW']==98).sum()
    d['NDUpst']= (df.loc[ isd['RH']<=90,'PW']==3).sum()
    d['VIS_Hvg'] = hmean(df.loc[df['VIS']>0,'VIS'])
    d['Vi_Avg'] = df['Vi'].mean()

    return pd.Series(d,index=['NSTN','NVIS','NMW','NPW',\
'NDU','NDU6','NDU7','NDU8','NDU9','NDU30','NDU31','NDU32',\
'NDU33','NDU34','NDU35','NDU98','NDUpst','VIS_Hvg','Vi_Avg'])

if __name__ =='__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("start_year",type=int,help='4-digit start year')
    parser.add_argument("end_year",type=int,help='4-digit end year')
    args = parser.parse_args()
    years = np.arange(args.start_year,args.end_year)

    stnall = pd.read_csv('stat/isd_station_list.csv',dtype={'STATION':'str'})
    for iyr,yr in enumerate(years):
        print('process year {:d} at {:s}'.format(yr,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
        isd = pd.read_hdf('hdf/isd_lite_'+str(yr)+'.h5',dtype={'STATION':'str'})
        isd = isd[isd['STATION'].isin(stnall['STATION'])]

        print('>> number of stations: {:d}'.format(isd['STATION'].nunique()))

        isd['YYYYMM'] = pd.to_datetime(isd['DATE'],format='%Y%m%dT%H%M').dt.strftime('%Y%m')
        isd['VIS'] = isd['VIS']/1000.
        isd['Vi'] = isd['VIS'].apply(lambda x: 1/x if x>0 else np.nan)
        isd['DUP'] = np.where(isd['WND']>ut, isd['WND']**3 * (1+ut/isd['WND']) * (1-ut**2/isd['WND']**2),0).round(3)

        print('>> groupby and output at {:s}'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))

        Nsrc = isd.groupby(['STATION','SOURCE']).size().unstack(fill_value=0)
        Nsrc.to_csv('stat/yearly/stn_all/Nsrc_'+str(yr)+'.csv')

        Nrep = isd.groupby(['STATION','REPORT_TYPE']).size().unstack(fill_value=0)
        Nrep.to_csv('stat/yearly/stn_all/Nrep_'+str(yr)+'.csv')

        stn_month = isd.groupby(['STATION','YYYYMM']).apply(Nstat).reset_index().astype(dTypes)
        stn_month.to_csv('stat/yearly/stn_all/stn_month_'+str(yr)+'.csv',index=False,float_format='%.3f')

Последняя операция группового режима (по STATION и YYYYMM) самое трудоемкое.

Редактировать - у меня довольно хорошая рабочая станция (256 ядер) и я хочу максимально использовать ее.

...