Многопроцессорность в python - процессы не закрываются после завершения - PullRequest
0 голосов
/ 11 ноября 2018

У меня есть пул процессов в python, который запускает процессы как обычно, однако я только что понял, что эти процессы не закрываются после завершения (я знаю, что они завершены, так как последний оператор является записью в файл). Ниже приведен код с примером функции ppp:

from multiprocessing import Pool
import itertools

def ppp(element):
    window,day = element
    print(window,day)
    time.sleep(10)

if __name__ == '__main__':  ##The line marked
    print('START')
    start_time = current_milli_time()
    days = ['0808', '0810', '0812', '0813', '0814', '0817', '0818', '0827']
    windows = [1000,2000,3000,4000,5000,10000,15000, 20000,30000,60000,120000,180000]
    processes_args = list(itertools.product(windows, days))        
    pool = Pool(8) 
    results = pool.map(ppp, processes_args)
    pool.close() 
    pool.join() 
    print('END', current_milli_time()-start_time)

Я работаю на Linux, Ubuntu 16.04. Все работало нормально, прежде чем я добавил строку, отмеченную в примере. Мне интересно, может ли это поведение быть связано с отсутствием оператора return. Во всяком случае, это то, что выглядит как мой «htop»: enter image description here Как видите, ни один процесс не закрыт, но все завершили свою работу.

Я обнаружил, что связанный с этим вопрос: Python Multiprocessing pool.close () и join () не закрывают процессы , однако я не понял, если решение этой проблемы заключается в использовании map_async вместо map .

РЕДАКТИРОВАТЬ: реальный код функции:

def process_day(element):
    window,day = element
    noise = 0.2
    print('Processing day:', day,', window:', window)
    individual_files = glob.glob('datan/'+day+'/*[0-9].csv')
    individual = readDataset(individual_files)
    label_time = individual.loc[(individual['LABEL_O'] != -2) | (individual['LABEL_F'] != -2), 'TIME']
    label_time = list(np.unique(list(label_time)))
    individual = individual[individual['TIME'].isin(label_time)]
    #Saving IDs for further processing
    individual['ID'] = individual['COLLAR']
    #Time variable in seconds for aggregation and merging
    individual['TIME_S'] = individual['TIME'].copy()
    noise_x = np.random.normal(0,noise,len(individual))
    noise_y = np.random.normal(0,noise,len(individual))
    noise_z = np.random.normal(0,noise,len(individual))
    individual['X_AXIS'] = individual['X_AXIS'] + noise_x
    individual['Y_AXIS'] = individual['Y_AXIS'] + noise_y
    individual['Z_AXIS'] = individual['Z_AXIS'] + noise_z
    #Time syncronization (applying milliseconds for time series processing)
    print('Time syncronization:')
    with progressbar.ProgressBar(max_value=len(individual.groupby('ID'))) as bar:
        for baboon,df_baboon in individual.groupby('ID'):
            times = list(df_baboon['TIME'].values)
            d = Counter(times)
            result = []
            for timestamp in np.unique(times):
                for i in range(0,d[timestamp]):
                    result.append(str(timestamp+i*1000/d[timestamp]))
            individual.loc[individual['ID'] == baboon,'TIME'] = result
            bar.update(1)

    #Time series process
    ts_process = time_series_processing(window, 'TIME_S', individual, 'COLLAR', ['COLLAR', 'TIME', 'X_AXIS','Y_AXIS','Z_AXIS'])
    #Aggregation and tsfresh
    ts_process.do_process()
    individual = ts_process.get_processed_dataframe()
    individual.to_csv('noise2/processed_data/'+str(window)+'/agg/'+str(day)+'.csv', index = False)
    #NEtwork inference process
    ni = network_inference_process(individual, 'TIME_S_mean')
    #Inference
    ni.do_process()
    final = ni.get_processed_dataframe()
    final.to_csv('noise2/processed_data/'+str(window)+'/net/'+str(day)+'.csv', index = False)
    #Saving not aggregated ground truth
    ground_truth = final[['ID_mean', 'TIME_S_mean', 'LABEL_O_values', 'LABEL_F_values']].copy()
    #Neighbor features process
    neighbors_features_f = ni.get_neighbor_features(final, 'TIME_S_mean', 'ID_mean')
    neighbors_features_f = neighbors_features_f.drop(['LABEL_O_values_n', 'LABEL_F_values_n'], axis=1)
    neighbors_features_f.to_csv('noise2/processed_data/'+str(window)+'/net/'+str(day)+'_neigh.csv', index = False)
    # Final features dataframe
    final_neigh = pd.merge(final, neighbors_features_f,  how='left', left_on=['TIME_S_mean','ID_mean'], right_on = ['TIME_S_mean_n','BABOON_NODE_n'])
    final_neigh.to_csv('noise2/processed_data/'+str(window)+'/complete/'+str(day)+'.csv', index = False)
    return

Итак, как вы можете видеть, последнее утверждение является записью в файл, и оно выполняется всеми процессами, на самом деле я не думаю, что проблема заключается в этой функции.

...