предварительная обработка данных с использованием Multiprocessing Pool - PullRequest
0 голосов
/ 27 октября 2018

Я использую Pandas для предварительной обработки больших данных (10 миллионов).Вместо того, чтобы извлекать все по времени, я пытаюсь извлечь данные несколько раз и поочередно использовать многопроцессорную обработку.Для теста есть 10000 строк в курсорах и выборка 1000 на время:

for i in range(loop_num): # loop_num=10
    begin_time = time.time()
    rows = cursor.fetchmany(1000)  # get 1000 for a time
    end_time1 = time.time()
    print("cost %.2f seconds fetching data for No%d time" % (end_time1-begin_time, i))

    # data preprocessing
    print("preprocessing data for one time...")
    p = Pool(5)  # use multiprocessing
    results = []
    tasks_num = 10
    rows_num = int(math.ceil(1000 / tasks_num)) # rows in each subprocess

    for j in range(tasks_num):
        row = rows[(j * rows_num):((j + 1) * rows_num)]
        print("get %d rows" % len(row))
        # save preprocessed data in results
        results.append(p.apply_async(onetime_clean, args=(table_cols, row, predict_month), error_callback=bar))

    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    df_nps_sca = pd.DataFrame(columns=results[0].get().columns.tolist())

    # get the result as a dataframe
    for m in range(len(results)):
        df_nps_sca = df_nps_sca.append(results[m].get(),ignore_index=True, sort=False)
    print(df_nps_sca.shape)
    end_time_per = time.time()
    print("cost %.2f seconds NO%d time" % (end_time_per-begin_time, i))
# end for
print("data preprocessing finished!")
conn.close()

А вот моя функция onetime_clean:

def onetime_clean(table_cols, rows, predict_month):
    num_columns = ['AGE', 'DURATION', 'ENTERTAINMENT', 'SOCIALITY', 'LIFE']
    scene_columns = ['SCENIC_SCENE', 'TRAFIC_SCENE','OFFICE_SCENE','PUBLIC_SCENE']

    # change rows into dataframe
    df_nps = pd.DataFrame(rows, columns=table_cols)
    # call another function which just do some transfer
    df_nps = data_transfer(df_nps)

    # load model imputer and scaler from training data
    sc_x = joblib.load('./sc_X.pkl')
    imputer = joblib.load('./imputer.pkl')

    # using imputer fillna  
    df_nps[num_columns] = imputer.transform(df_nps[num_columns])

    # scaler without the first col and change them into dataframe
    df_nps_sca = sc_x.transform(df_nps.iloc[:, 1:])  # got error this line
    df_nps_sca = pd.DataFrame(df_nps_sca, index=df_nps.index, columns=sca_cols)
    return df_nps_sca 

Когда я выполняю, я получаю ошибки внекоторые подпроцессы (не все), в то время как остальные успешны:

ошибка: операнды не могут быть переданы вместе с фигурами (100,62) (63,) (100,62)

Похоже, что некоторые переменные изменяются одновременно более чем одной подпроцессой, но я не знаю почему.Понятия не имею, помогите пожалуйста.

...