У меня есть пул процессов в python, который запускает процессы как обычно, однако я только что понял, что эти процессы не закрываются после завершения (я знаю, что они завершены, так как последний оператор является записью в файл).
Ниже приведен код с примером функции ppp:
from multiprocessing import Pool
import itertools
def ppp(element):
window,day = element
print(window,day)
time.sleep(10)
if __name__ == '__main__': ##The line marked
print('START')
start_time = current_milli_time()
days = ['0808', '0810', '0812', '0813', '0814', '0817', '0818', '0827']
windows = [1000,2000,3000,4000,5000,10000,15000, 20000,30000,60000,120000,180000]
processes_args = list(itertools.product(windows, days))
pool = Pool(8)
results = pool.map(ppp, processes_args)
pool.close()
pool.join()
print('END', current_milli_time()-start_time)
Я работаю на Linux, Ubuntu 16.04. Все работало нормально, прежде чем я добавил строку, отмеченную в примере. Мне интересно, может ли это поведение быть связано с отсутствием оператора return. Во всяком случае, это то, что выглядит как мой «htop»:
Как видите, ни один процесс не закрыт, но все завершили свою работу.
Я обнаружил, что связанный с этим вопрос: Python Multiprocessing pool.close () и join () не закрывают процессы , однако я не понял, если решение этой проблемы заключается в использовании map_async вместо map .
РЕДАКТИРОВАТЬ: реальный код функции:
def process_day(element):
window,day = element
noise = 0.2
print('Processing day:', day,', window:', window)
individual_files = glob.glob('datan/'+day+'/*[0-9].csv')
individual = readDataset(individual_files)
label_time = individual.loc[(individual['LABEL_O'] != -2) | (individual['LABEL_F'] != -2), 'TIME']
label_time = list(np.unique(list(label_time)))
individual = individual[individual['TIME'].isin(label_time)]
#Saving IDs for further processing
individual['ID'] = individual['COLLAR']
#Time variable in seconds for aggregation and merging
individual['TIME_S'] = individual['TIME'].copy()
noise_x = np.random.normal(0,noise,len(individual))
noise_y = np.random.normal(0,noise,len(individual))
noise_z = np.random.normal(0,noise,len(individual))
individual['X_AXIS'] = individual['X_AXIS'] + noise_x
individual['Y_AXIS'] = individual['Y_AXIS'] + noise_y
individual['Z_AXIS'] = individual['Z_AXIS'] + noise_z
#Time syncronization (applying milliseconds for time series processing)
print('Time syncronization:')
with progressbar.ProgressBar(max_value=len(individual.groupby('ID'))) as bar:
for baboon,df_baboon in individual.groupby('ID'):
times = list(df_baboon['TIME'].values)
d = Counter(times)
result = []
for timestamp in np.unique(times):
for i in range(0,d[timestamp]):
result.append(str(timestamp+i*1000/d[timestamp]))
individual.loc[individual['ID'] == baboon,'TIME'] = result
bar.update(1)
#Time series process
ts_process = time_series_processing(window, 'TIME_S', individual, 'COLLAR', ['COLLAR', 'TIME', 'X_AXIS','Y_AXIS','Z_AXIS'])
#Aggregation and tsfresh
ts_process.do_process()
individual = ts_process.get_processed_dataframe()
individual.to_csv('noise2/processed_data/'+str(window)+'/agg/'+str(day)+'.csv', index = False)
#NEtwork inference process
ni = network_inference_process(individual, 'TIME_S_mean')
#Inference
ni.do_process()
final = ni.get_processed_dataframe()
final.to_csv('noise2/processed_data/'+str(window)+'/net/'+str(day)+'.csv', index = False)
#Saving not aggregated ground truth
ground_truth = final[['ID_mean', 'TIME_S_mean', 'LABEL_O_values', 'LABEL_F_values']].copy()
#Neighbor features process
neighbors_features_f = ni.get_neighbor_features(final, 'TIME_S_mean', 'ID_mean')
neighbors_features_f = neighbors_features_f.drop(['LABEL_O_values_n', 'LABEL_F_values_n'], axis=1)
neighbors_features_f.to_csv('noise2/processed_data/'+str(window)+'/net/'+str(day)+'_neigh.csv', index = False)
# Final features dataframe
final_neigh = pd.merge(final, neighbors_features_f, how='left', left_on=['TIME_S_mean','ID_mean'], right_on = ['TIME_S_mean_n','BABOON_NODE_n'])
final_neigh.to_csv('noise2/processed_data/'+str(window)+'/complete/'+str(day)+'.csv', index = False)
return
Итак, как вы можете видеть, последнее утверждение является записью в файл, и оно выполняется всеми процессами, на самом деле я не думаю, что проблема заключается в этой функции.