У меня есть функция, которая зацикливается на тысячах строк, и для ее обработки требуется время. Я пытаюсь применить параллельную обработку, используя pool.map
и не знаю, как это сделать.
def Recommendation(i,resourceDf,standDf,standards,resources):
Standard_list = standards.iloc[i]["Standard Grades"]
Standard_list = list(Standard_list.split(" "))
Standard_list = Standard_list[0].split(",")
Filtered_Res = resources[resources["Grades"].str.split(",",expand=True).isin(Standard_list).any(1)]
std1 = Filtered_Res.index
contents_F = resources.iloc[std1]["Evaluation Code"]
guid = str(standards.loc[i]["GUID"])
d1 = resourceDf[std1]
d2 = standDf[[i]]
d2 = normalise(d2)
d1 = normalise(d1)
final = np.dot(d2,d1.T)
Filter_score_list = final[0].tolist()
result_filtered = {}
result_filtered["GUID"]=guid
Evaluation = {}
for k in range(0,len(contents_F)):
Evaluation[str(contents_F.iloc[k])] = Filter_score_list[k]
sorted_Evaluation = dict( sorted(Evaluation.items(), key=operator.itemgetter(1),reverse=True))
sorted_Evaluation = dict(itertools.islice(sorted_Evaluation.items(), 100))
result_filtered.update(sorted_Evaluation)
db.Eng_Rec.insert_one(result_filtered)
def main():
resources = pd.read_csv("\Res_cleaned.csv")
standards = pd.read_csv("\Std_cleaned.csv")
resources_length = len(resources)
fname = "D:\Maths_Model.bin"
model = Doc2Vec.load(fname)
vec_df = model.docvecs.vectors_docs
resourceDf = vec_df[:resources_length,:]
standDf = vec_df[resources_length:,:]
num_partitions = 5
df_split = np.array_split(standards, num_partitions)
for i in range(len(df_split)):
pool = Pool(4)
func = partial(Recommendation,i,resourceDf,standDf,standards,resources)
pool.starmap(func,df_split)
pool.close()
pool.join()
if __name__ == "__main__":
main()
Этот код показывает ошибку, что позиционные аргументы неверны. Данные стандартов разделены на 5 блоков, и они должны проходить через каждый стандарт в каждом блоке как параллельный процесс. resourceDf
, standDf
данные постоянно меняются для каждого i
в итерации.