Multiprocessing Pool выполняет весь код для каждого созданного процесса, а не только переданную ему функцию - PullRequest
2 голосов
/ 17 июня 2019

Python-код выполняет извлечение данных и предварительную обработку (для преобразования данных в итерацию для запуска Pool Map), а затем вызывает Pool Map для автоматического размещения данных и их параллельной работы.Однако я обнаружил, что сбор данных и предварительная обработка снова выполняются в каждом процессе в дополнение к функции, которая должна выполняться в каждом процессе.

Код выглядит следующим образом:

#Data Pull
orig_data= pd.read_csv(<fileath>)

#Data Preprocessing
start=time.time()
transformed_input=list() # transformed_input is populated with data from orig_data dataframe as required
end=time.time()
print("Preprocessing Time")
print(end-start)


def examplefunction(transformed_input_sub):
    # Required Function
    return output

if __name__ == '__main__':
    from multiprocessing import Pool
    p=Pool()
    s=time.time()
    output=p.map(examplefunction,transformed_input)
    p.close()
    p.join()
    e=time.time()
    print("Time Taken")  
    print(e-s)

Ожидаемый результат:
Время предварительной обработки
5.01
Время выполнения
10

Фактический выход при работе в 4 процессах:
Время предварительной обработки
5.01
Время предварительной обработки
5.12
Время предварительной обработки
5,35
Время предварительной обработки
5,41
Время выполнения
10

Тем не менее, вывод этого кода правильный (и проверен), несмотря на то, что предварительная обработка выполняется для каждого процесса, что, насколько я знаю, не должно быть.Тогда возникает проблема, когда вместо извлечения данных из CSV-файла, содержащего только небольшую часть данных, я использую Teradata Pull для почти 50 миллионов записей.Функция извлечения TD без многопроцессорной обработки работает отлично, но при многопроцессорной обработке выдает следующую ошибку:

Нет больше места в буфере в имени базы данных

ЭтоВероятно, каждый процесс выполняет TD Pull.
Как я могу гарантировать, что пул запускает только функцию, переданную ему параллельно, а не полный код с нуля?

Я попытался изменить порядок блоков кода, вызвав операторы импорта вне блока name == 'main', а также поместил данные и код предварительной обработки в отдельный код Python и вызвал его с помощью exec ().в рамках основной программы.

...