Преобразование многопроцессорной обработки Python в PySpark - PullRequest
1 голос
/ 24 октября 2019

Здравствуйте. Я пытаюсь преобразовать использование многопроцессорной обработки в Python в PySpark. Я не эксперт в PySpark, поэтому сначала пытаюсь понять, возможно ли это сделать. И если это так, как я могу это сделать?

import multiprocessing as mp
import pandas as pd

def connectToDB():
    # BLABLABLA
    return connectionInstance, cursorInstance 

def construct_each_company(tmpDF_forPeerGroup, ii, start_str, end_str):

    connectionInstance, cursorInstance = connectToDB()
    sql = "SELECT * FROM DataBase.Scores WHERE company_idx = "+str(idx)+" AND timestamp BETWEEN '"+start_str+"' AND '"+end_str+"'"
    tmpDF = pd.read_sql(sql, con=connectionInstance)

    return (ii, tmpList_forThisCompany, parall_Name)

def main():
    tmpDF_forPeerGroup = ['company1','company2','company3','company4','company5']
    start_str = "2019-06-20"
    end_str = "2019-10-17"

    pool = mp.Pool(8)

    results = pool.starmap_async(construct_each_company, [(tmpDF_forPeerGroup, i, start_str, end_str) for i in range(0, len(tmpDF_forPeerGroup))]).get()
    pool.close()

    results.sort(key=lambda x: x[0])
    finalListForCompanies = [r2 for (r1, r2, r3) in results]  

В настоящее время у меня возникают проблемы с многопроцессорной обработкой Python, так как она застревает, когда я пытаюсь в итоге собрать более 150 таблиц (я не знаю почему). Мой предел запросов к базе данных MySQL составляет 600, поэтому я не понимаю, почему он зависает.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...