Как использовать параллельную обработку для цикла for, используя несколько аргументов? - PullRequest
0 голосов
/ 08 ноября 2019

У меня есть функция, которая зацикливается на тысячах строк, и для ее обработки требуется время. Я пытаюсь применить параллельную обработку, используя pool.map и не знаю, как это сделать.

def Recommendation(i,resourceDf,standDf,standards,resources):

    Standard_list = standards.iloc[i]["Standard Grades"]
    Standard_list = list(Standard_list.split(" "))
    Standard_list = Standard_list[0].split(",")
    Filtered_Res = resources[resources["Grades"].str.split(",",expand=True).isin(Standard_list).any(1)]
    std1 = Filtered_Res.index
    contents_F = resources.iloc[std1]["Evaluation Code"]
    guid = str(standards.loc[i]["GUID"])
    d1 = resourceDf[std1]
    d2 = standDf[[i]]

    d2 = normalise(d2)
    d1 = normalise(d1)

    final = np.dot(d2,d1.T)
    Filter_score_list = final[0].tolist()
    result_filtered = {}
    result_filtered["GUID"]=guid

    Evaluation = {}
    for k in range(0,len(contents_F)):
        Evaluation[str(contents_F.iloc[k])] = Filter_score_list[k]
    sorted_Evaluation = dict( sorted(Evaluation.items(), key=operator.itemgetter(1),reverse=True))
    sorted_Evaluation = dict(itertools.islice(sorted_Evaluation.items(), 100))
    result_filtered.update(sorted_Evaluation)

    db.Eng_Rec.insert_one(result_filtered)

 def main():
     resources = pd.read_csv("\Res_cleaned.csv")
     standards = pd.read_csv("\Std_cleaned.csv")
     resources_length = len(resources)

     fname = "D:\Maths_Model.bin"
     model = Doc2Vec.load(fname)
     vec_df = model.docvecs.vectors_docs
     resourceDf = vec_df[:resources_length,:]
     standDf = vec_df[resources_length:,:]
     num_partitions = 5
     df_split = np.array_split(standards, num_partitions)
     for i in range(len(df_split)):
         pool = Pool(4)
         func = partial(Recommendation,i,resourceDf,standDf,standards,resources)
         pool.starmap(func,df_split)
         pool.close()
         pool.join()

if __name__ == "__main__":
   main()

Этот код показывает ошибку, что позиционные аргументы неверны. Данные стандартов разделены на 5 блоков, и они должны проходить через каждый стандарт в каждом блоке как параллельный процесс. resourceDf, standDf данные постоянно меняются для каждого i в итерации.

...