Непоследовательные и иногда неудачные результаты при комбинировании многопроцессорной обработки и pandas в Python - PullRequest
0 голосов
/ 02 марта 2020

Я новичок в параллельных вычислениях и хотел бы узнать, как использовать параллельные вычисления с Pandas датафреймом. Я пытаюсь воспроизвести результаты этого урока . Я сделал очень незначительные изменения, и вот код:

import sys
import random
import pandas as pd
import numpy as np
import time
from multiprocessing import Pool

def parallelize_dataframe(df, func):
    df_split = np.array_split(df, 3)
    pool = Pool(4)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

def add_features(df):
    df['question_text'] = df['question_text'].apply(lambda x:str(x))
    df["lower_question_text"] = df["question_text"].apply(lambda x: x.lower())
    df['total_length'] = df['question_text'].apply(len)
    df['capitals'] = df['question_text'].apply(lambda comment: sum(1 for c in comment if c.isupper()))
    df['caps_vs_length'] = df.apply(lambda row: float(row['capitals'])/float(row['total_length']),
                                axis=1)
    df['num_words'] = df.question_text.str.count('\S+')
    df['num_unique_words'] = df['question_text'].apply(lambda comment: len(set(w for w in comment.split())))
    df['words_vs_unique'] = df['num_unique_words'] / df['num_words'] 
    df['num_exclamation_marks'] = df['question_text'].apply(lambda comment: comment.count('!'))
    df['num_question_marks'] = df['question_text'].apply(lambda comment: comment.count('?'))
    df['num_punctuation'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in '.,;:'))
    df['num_symbols'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in '*&$%'))
    df['num_smilies'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in (':-)', ':)', ';-)', ';)')))
    df['num_sad'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in (':-<', ':()', ';-()', ';(')))
    df["mean_word_len"] = df["question_text"].apply(lambda x: np.mean([len(w) for w in str(x).split()]))
    return df

if __name__ == '__main__':
    train_df = pd.read_csv("train.csv")
    #train_df = train_df[0:1000000]
    print(len(train_df))

    ts1 = time.time()
    train = add_features(train_df)
    te1 = time.time()
    print('%2.2f s' % (te1 - ts1))

    ts2 = time.time()
    train = parallelize_dataframe(train_df, add_features) 
    te2 = time.time()
    print('%2.2f s' % (te2 - ts2))

Я получаю некоторые странные, ненадежные результаты выполнения кода. В течение более 50% времени код выполнялся успешно и давал ожидаемые результаты, такие как

63.69 s
24.98 s

Но примерно в 10% случаев он просто завершал sh выполнение без отображения любые ожидаемые print операторы.

Еще около 10-20% это будет конечной sh функцией add_features(), но никогда не будет конечной sh parallelize_dataframe(). Я бы увидел 4 Python процессов, все еще работающих в фоновом режиме, но в течение более часа он просто продолжает работать. Выведите как:

68.43 s

Может кто-нибудь объяснить, что происходит под капотом, и как это исправить?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...