Я новичок в параллельных вычислениях и хотел бы узнать, как использовать параллельные вычисления с Pandas датафреймом. Я пытаюсь воспроизвести результаты этого урока . Я сделал очень незначительные изменения, и вот код:
import sys
import random
import pandas as pd
import numpy as np
import time
from multiprocessing import Pool
def parallelize_dataframe(df, func):
df_split = np.array_split(df, 3)
pool = Pool(4)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
def add_features(df):
df['question_text'] = df['question_text'].apply(lambda x:str(x))
df["lower_question_text"] = df["question_text"].apply(lambda x: x.lower())
df['total_length'] = df['question_text'].apply(len)
df['capitals'] = df['question_text'].apply(lambda comment: sum(1 for c in comment if c.isupper()))
df['caps_vs_length'] = df.apply(lambda row: float(row['capitals'])/float(row['total_length']),
axis=1)
df['num_words'] = df.question_text.str.count('\S+')
df['num_unique_words'] = df['question_text'].apply(lambda comment: len(set(w for w in comment.split())))
df['words_vs_unique'] = df['num_unique_words'] / df['num_words']
df['num_exclamation_marks'] = df['question_text'].apply(lambda comment: comment.count('!'))
df['num_question_marks'] = df['question_text'].apply(lambda comment: comment.count('?'))
df['num_punctuation'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in '.,;:'))
df['num_symbols'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in '*&$%'))
df['num_smilies'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in (':-)', ':)', ';-)', ';)')))
df['num_sad'] = df['question_text'].apply(lambda comment: sum(comment.count(w) for w in (':-<', ':()', ';-()', ';(')))
df["mean_word_len"] = df["question_text"].apply(lambda x: np.mean([len(w) for w in str(x).split()]))
return df
if __name__ == '__main__':
train_df = pd.read_csv("train.csv")
#train_df = train_df[0:1000000]
print(len(train_df))
ts1 = time.time()
train = add_features(train_df)
te1 = time.time()
print('%2.2f s' % (te1 - ts1))
ts2 = time.time()
train = parallelize_dataframe(train_df, add_features)
te2 = time.time()
print('%2.2f s' % (te2 - ts2))
Я получаю некоторые странные, ненадежные результаты выполнения кода. В течение более 50% времени код выполнялся успешно и давал ожидаемые результаты, такие как
63.69 s
24.98 s
Но примерно в 10% случаев он просто завершал sh выполнение без отображения любые ожидаемые print
операторы.
Еще около 10-20% это будет конечной sh функцией add_features()
, но никогда не будет конечной sh parallelize_dataframe()
. Я бы увидел 4 Python процессов, все еще работающих в фоновом режиме, но в течение более часа он просто продолжает работать. Выведите как:
68.43 s
Может кто-нибудь объяснить, что происходит под капотом, и как это исправить?