Здравствуйте. Я пытаюсь преобразовать использование многопроцессорной обработки в Python в PySpark. Я не эксперт в PySpark, поэтому сначала пытаюсь понять, возможно ли это сделать. И если это так, как я могу это сделать?
import multiprocessing as mp
import pandas as pd
def connectToDB():
# BLABLABLA
return connectionInstance, cursorInstance
def construct_each_company(tmpDF_forPeerGroup, ii, start_str, end_str):
connectionInstance, cursorInstance = connectToDB()
sql = "SELECT * FROM DataBase.Scores WHERE company_idx = "+str(idx)+" AND timestamp BETWEEN '"+start_str+"' AND '"+end_str+"'"
tmpDF = pd.read_sql(sql, con=connectionInstance)
return (ii, tmpList_forThisCompany, parall_Name)
def main():
tmpDF_forPeerGroup = ['company1','company2','company3','company4','company5']
start_str = "2019-06-20"
end_str = "2019-10-17"
pool = mp.Pool(8)
results = pool.starmap_async(construct_each_company, [(tmpDF_forPeerGroup, i, start_str, end_str) for i in range(0, len(tmpDF_forPeerGroup))]).get()
pool.close()
results.sort(key=lambda x: x[0])
finalListForCompanies = [r2 for (r1, r2, r3) in results]
В настоящее время у меня возникают проблемы с многопроцессорной обработкой Python, так как она застревает, когда я пытаюсь в итоге собрать более 150 таблиц (я не знаю почему). Мой предел запросов к базе данных MySQL составляет 600, поэтому я не понимаю, почему он зависает.