Я использую Pandas для предварительной обработки больших данных (10 миллионов).Вместо того, чтобы извлекать все по времени, я пытаюсь извлечь данные несколько раз и поочередно использовать многопроцессорную обработку.Для теста есть 10000 строк в курсорах и выборка 1000 на время:
for i in range(loop_num): # loop_num=10
begin_time = time.time()
rows = cursor.fetchmany(1000) # get 1000 for a time
end_time1 = time.time()
print("cost %.2f seconds fetching data for No%d time" % (end_time1-begin_time, i))
# data preprocessing
print("preprocessing data for one time...")
p = Pool(5) # use multiprocessing
results = []
tasks_num = 10
rows_num = int(math.ceil(1000 / tasks_num)) # rows in each subprocess
for j in range(tasks_num):
row = rows[(j * rows_num):((j + 1) * rows_num)]
print("get %d rows" % len(row))
# save preprocessed data in results
results.append(p.apply_async(onetime_clean, args=(table_cols, row, predict_month), error_callback=bar))
print('Waiting for all subprocesses done...')
p.close()
p.join()
df_nps_sca = pd.DataFrame(columns=results[0].get().columns.tolist())
# get the result as a dataframe
for m in range(len(results)):
df_nps_sca = df_nps_sca.append(results[m].get(),ignore_index=True, sort=False)
print(df_nps_sca.shape)
end_time_per = time.time()
print("cost %.2f seconds NO%d time" % (end_time_per-begin_time, i))
# end for
print("data preprocessing finished!")
conn.close()
А вот моя функция onetime_clean:
def onetime_clean(table_cols, rows, predict_month):
num_columns = ['AGE', 'DURATION', 'ENTERTAINMENT', 'SOCIALITY', 'LIFE']
scene_columns = ['SCENIC_SCENE', 'TRAFIC_SCENE','OFFICE_SCENE','PUBLIC_SCENE']
# change rows into dataframe
df_nps = pd.DataFrame(rows, columns=table_cols)
# call another function which just do some transfer
df_nps = data_transfer(df_nps)
# load model imputer and scaler from training data
sc_x = joblib.load('./sc_X.pkl')
imputer = joblib.load('./imputer.pkl')
# using imputer fillna
df_nps[num_columns] = imputer.transform(df_nps[num_columns])
# scaler without the first col and change them into dataframe
df_nps_sca = sc_x.transform(df_nps.iloc[:, 1:]) # got error this line
df_nps_sca = pd.DataFrame(df_nps_sca, index=df_nps.index, columns=sca_cols)
return df_nps_sca
Когда я выполняю, я получаю ошибки внекоторые подпроцессы (не все), в то время как остальные успешны:
ошибка: операнды не могут быть переданы вместе с фигурами (100,62) (63,) (100,62)
Похоже, что некоторые переменные изменяются одновременно более чем одной подпроцессой, но я не знаю почему.Понятия не имею, помогите пожалуйста.