Многопроцессорность не создает лишних процессов - PullRequest
0 голосов
/ 22 октября 2018

Я пытаюсь увеличить скорость моей программы на Python, используя многопроцессорность, но на самом деле она не создает больше процессов.Я просмотрел несколько учебных пособий, но никуда не деться.

Вот оно:

    cpuutil = int((multiprocessing.cpu_count()) / 2)
    p = Pool(processes = cpuutil)
    output = p.map(OSGBtoETRSfunc(data, eastcol, northcol))
    p.close()
    p.join()
    return output

Итак, для меня это должно создать 2 процесса на четырехъядерной машине, но этоне делает.У меня загрузка процессора составляет около 18% ...

Есть идеи?Он выглядит так же, как и учебники, которые я наблюдал ... p.map не работал при перечислении аргументов в квадратных скобках ([]), поэтому я предположил, что он должен быть в синтаксисе, который указан выше?

Спасибо

Ответы [ 2 ]

0 голосов
/ 22 октября 2018

Я не совсем понимаю, что вы хотите, поэтому давайте начнем с простого.Ниже приведен способ простого вызова одной и той же функции в строках кадра данных pd:

import pandas as pd
import numpy as np
import os

import pathos
from contextlib import closing

NUM_PROCESSES = os.cpu_count()
# create some data frame 100x4

nrow = 100
ncol = 4
df = pd.DataFrame(np.random.randint(0,100,size=(nrow, ncol)), columns=list('ABCD'))

# dataframe resides in global scope
# so it is accessible to processes spawned below
# I pass only row indices to each process

# function to be run over rows
# it transforms the given row independently
def foo(idx):
    # extract given row to numpy
    row = df.iloc[[idx]].values[0]
    # you can pass ranges:
    # df[2:3]

    # transform row
    # I return it as list for simplicity of creating dataframe
    row = np.exp(row)

    # return numpy row
    return row


# run pool over range of indexes (0,1, ... , nrow-1)
# and close it afterwars
# there is not reason here to have more workers than number of CPUs
with closing(pathos.multiprocessing.Pool(processes=NUM_PROCESSES)) as pool:    
    results = pool.map(foo, range(nrow))

# create new dataframe from all those numpy slices:
col_names = df.columns.values.tolist()
df_new = pd.DataFrame(np.array(results), columns=col_names)

Что в ваших вычислениях требует более сложной настройки?

РЕДАКТИРОВАТЬ: Хорошо, здесь выполняется дваработает одновременно (я не очень знаком с пандами, так что просто переключитесь на numpy):

# RUNNING TWO FUNCTIONS SIMLTANEOUSLY

import pandas as pd
import numpy as np

from multiprocessing import Process, Queue

# create some data frame 100x4

nrow = 100
ncol = 4
df = pd.DataFrame(np.random.randint(0,100,size=(nrow, ncol)), columns=list('ABCD'))

# dataframe resides in global scope
# so it is accessible to processes spawned below
# I pass only row indices to each process

# function to be run over part1 independently
def proc_func1(q1):

    # get data from queue1
    data1 = q1.get()

    # I extract given data to numpy
    data_numpy = data1.values

    # do something
    data_numpy_new = data_numpy + 1

    # return numpy array to queue 1
    q1.put(data_numpy_new)

    return 


# function to be run over part2 independently
def proc_func2(q2):

    # get data from queue2
    data2 = q2.get()


    # I extract given data to numpy
    data_numpy = data2.values

    # do something
    data_numpy_new = data_numpy - 1


    # return numpy array to queue 2
    q2.put(data_numpy_new)

    return


# instantiate queues
q1 = Queue()
q2 = Queue()

# divide data frame into two parts

part1 = df[:50]
part2 = df[50:]

# send data, so it will already be in queries
q1.put(part1)
q2.put(part2)

# start two processes 
p1 = Process(target=proc_func1, args=(q1,))
p2 = Process(target=proc_func2, args=(q2,))

p1.start()
p2.start()

# wait until they finish
p1.join()
p2.join()


# read results from Queues

res1 = q1.get()
res2 = q2.get()

if (res1 is None) or (res2 is None):
    print('Error!')


# reassemble two results back to single dataframe (might be inefficient)
col_names = df.columns.values.tolist()
# concatenate results along x axis
df_new = pd.DataFrame(np.concatenate([np.array(res1), np.array(res2)], axis=0), columns=col_names)
0 голосов
/ 22 октября 2018

В Python вы должны предоставить функцию и разделенные аргументы.Если нет, вы выполняете функцию OSGBtoETRSfunc во время создания процесса.Вместо этого вы должны предоставить указатель на функцию и список с аргументами.

Ваш случай похож на тот, который показан в Документах Python: https://docs.python.org/3.7/library/multiprocessing.html#introduction

В любом случае, я думаю, что выиспользуете неправильную функцию.Pool.map () работает как map: в списке элементов и применяет одну и ту же функцию к каждому элементу.Я думаю, что ваша функция OSGBtoERTSfunc нуждается в трех параметрах для правильной работы.Пожалуйста, вместо использования p.map () используйте p.apply ()

cpuutil = int((multiprocessing.cpu_count()) / 2)
p = Pool(processes = cpuutil)
output = p.apply(OSGBtoETRSfunc, [data, eastcol, northcol])
p.close()
p.join()
return output
...