Простой работоспособный пример многопроцессорной обработки - PullRequest
0 голосов
/ 07 февраля 2019

Я ищу простой пример Python multiprocessing.

Я пытаюсь найти работоспособный пример Python multiprocessing.Я нашел пример разбиения больших чисел на простые числа.Это сработало, потому что было мало входных данных (одно большое число на ядро) и много вычислений (деление чисел на простые числа).

Однако меня интересует другое - у меня много входных данных, по которым я выполняю простые вычисления.Интересно, есть ли простой способ изменить приведенный ниже код, чтобы многоядерные процессоры действительно превосходили одноядерное?Я использую Python 3.6 на машине Win10 с 4 физическими ядрами и 16 ГБ оперативной памяти.

Вот мой пример кода.

import numpy as np
import multiprocessing as mp
import timeit

# comment the following line to get version without queue
queue = mp.Queue()
cores_no = 4


def npv_zcb(bnd_info, cores_no):

     bnds_no = len(bnd_info)
     npvs = []

     for bnd_idx in range(bnds_no):

         nom = bnd_info[bnd_idx][0]
         mat = bnd_info[bnd_idx][1]
         yld = bnd_info[bnd_idx][2]

         npvs.append(nom / ((1 + yld) ** mat))

     if cores_no == 1:
         return npvs
     # comment the following two lines to get version without queue
     else:
         queue.put(npvs)

# generate random attributes of zero coupon bonds

print('Generating random zero coupon bonds...')


bnds_no = 100

bnd_info = np.zeros([bnds_no, 3])
bnd_info[:, 0] = np.random.randint(1, 31, size=bnds_no)
bnd_info[:, 1] = np.random.randint(70, 151, size=bnds_no)
bnd_info[:, 2] = np.random.randint(0, 100, size=bnds_no) / 100
bnd_info = bnd_info.tolist()

# single core
print('Running single core...')
start = timeit.default_timer()
npvs = npv_zcb(bnd_info, 1)
print('   elapsed time: ', timeit.default_timer() - start, ' seconds')

# multiprocessing
print('Running multiprocessing...')
print('   ', cores_no, ' core(s)...')
start = timeit.default_timer()

processes = []

idx = list(range(0, bnds_no, int(bnds_no / cores_no)))
idx.append(bnds_no + 1)

for core_idx in range(cores_no):
     input_data = bnd_info[idx[core_idx]: idx[core_idx + 1]]

     process = mp.Process(target=npv_zcb,
                          args=(input_data, cores_no))
     processes.append(process)
     process.start()

for process_aux in processes:
     process_aux.join()

# comment the following three lines to get version without queue
mylist = []
while not queue.empty():
     mylist.append(queue.get())

print('   elapsed time: ', timeit.default_timer() - start, ' seconds')

Я был бы очень признателен, если бы кто-нибудь посоветовал мне, как изменить код так, чтобы многоядерный запуск превосходил одноядерный запуск.Я также заметил, что увеличение переменной bnds_no до 1000 приводит к BrokenPipeError.Можно было бы ожидать, что увеличение количества ввода приведет к увеличению вычислительного времени, а не к ошибке ... Что здесь не так?

Ответы [ 6 ]

0 голосов
/ 15 июня 2019

После некоторой помощи со стороны коллеги мне удалось создать простой фрагмент кода, который фактически выполнялся, как и ожидалось.Я был почти там - мой код нуждался в нескольких тонких (но решающих) модификациях.Чтобы запустить код, откройте приглашение anaconda, введите python -m idlelib, откройте файл и запустите его.

import pandas as pd
import numpy as np
import csv
import multiprocessing as mp
import timeit


def npv_zcb(core_idx, bnd_file, delimiter=','):
    """
    Michal Mackanic
    06/05/2019 v1.0

    Load bond positions from a .csv file, value the bonds and save results
    back to a .csv file.

    inputs:
        bnd_file: str
            full path to a .csv file with bond positions
        delimiter: str
            delimiter to be used in .csv file
    outputs:
        a .csv file with additional field npv.

    dependencies:

    example:
        npv_zcb('C:\\temp\\bnd_aux.csv', ',')
    """

    # core idx
    print('   npv_zcb() starting on core ' + str(core_idx))

    # load the input file as a dataframe
    bnd_info = pd.read_csv(bnd_file,
                           sep=delimiter,
                           quoting=2,  # csv.QUOTE_NONNUMERIC
                           header=0,
                           doublequote=True,
                           low_memory=False)

    # convert dataframe into list of dictionaries
    bnd_info = bnd_info.to_dict(orient='records')

    # get number of bonds in the file
    bnds_no = len(bnd_info)

    # go bond by bond
    for bnd_idx in range(bnds_no):
        mat = bnd_info[bnd_idx]['maturity']
        nom = bnd_info[bnd_idx]['nominal']
        yld = bnd_info[bnd_idx]['yld']
        bnd_info[bnd_idx]['npv'] = nom / ((1 + yld) ** mat)

    # covert list of dictionaries back to dataframe and save it as .csv file
    bnd_info = pd.DataFrame(bnd_info)
    bnd_info.to_csv(bnd_file,
                    sep=delimiter,
                    quoting=csv.QUOTE_NONNUMERIC,
                    quotechar='"',
                    index=False)

    # core idx
    print('   npv_zcb() finished on core ' + str(core_idx))

    # everything OK
    return True


def main(cores_no, bnds_no, path, delimiter):

    if __name__ == '__main__':
        mp.freeze_support()

        # generate random attributes of zero coupon bonds
        print('Generating random zero coupon bonds...')
        bnd_info = np.zeros([bnds_no, 3])
        bnd_info[:, 0] = np.random.randint(1, 31, size=bnds_no)
        bnd_info[:, 1] = np.random.randint(70, 151, size=bnds_no)
        bnd_info[:, 2] = np.random.randint(0, 100, size=bnds_no) / 100
        bnd_info = zip(bnd_info[:, 0], bnd_info[:, 1], bnd_info[:, 2])
        bnd_info = [{'maturity': mat,
                     'nominal': nom,
                     'yld': yld} for mat, nom, yld in bnd_info]
        bnd_info = pd.DataFrame(bnd_info)

        # save bond positions into a .csv file
        bnd_info.to_csv(path + 'bnd_aux.csv',
                        sep=delimiter,
                        quoting=csv.QUOTE_NONNUMERIC,
                        quotechar='"',
                        index=False)

        # prepare one .csv file per core
        print('Preparing input files...')

        idx = list(range(0, bnds_no, int(bnds_no / cores_no)))
        idx.append(bnds_no + 1)

        for core_idx in range(cores_no):
            # save bond positions into a .csv file
            file_name = path + 'bnd_aux_' + str(core_idx) + '.csv'
            bnd_info_aux = bnd_info[idx[core_idx]: idx[core_idx + 1]]
            bnd_info_aux.to_csv(file_name,
                                sep=delimiter,
                                quoting=csv.QUOTE_NONNUMERIC,
                                quotechar='"',
                                index=False)

        # SINGLE CORE
        print('Running single core...')

        start = timeit.default_timer()

        # evaluate bond positions
        npv_zcb(1, path + 'bnd_aux.csv', delimiter)

        print('   elapsed time: ', timeit.default_timer() - start, ' seconds')

        # MULTIPLE CORES
        # spread calculation among several cores
        print('Running multiprocessing...')
        print('   ', cores_no, ' core(s)...')

        start = timeit.default_timer()

        processes = []

        # go core by core
        print('        spreading calculation among processes...')
        for core_idx in range(cores_no):
            # run calculations
            file_name = path + 'bnd_aux_' + str(core_idx) + '.csv'
            process = mp.Process(target=npv_zcb,
                                     args=(core_idx, file_name, delimiter))
            processes.append(process)
            process.start()

        # wait till every process is finished
        print('        waiting for all processes to finish...')
        for process in processes:
            process.join()

        print('   elapsed time: ', timeit.default_timer() - start, ' seconds')


main(cores_no=2,
     bnds_no=1000000,
     path='C:\\temp\\',
     delimiter=',')
0 голосов
/ 06 мая 2019

ОК - наконец-то я нашел ответ.Многопроцессорная обработка не работает в Windows.Следующий код прекрасно работает в Ubuntu (Ubuntu 19.04 и python 3.7), но не в Windows (Win10 и python 3.6).Надеюсь, что это помогает другим ...

import pandas as pd
import numpy as np
import csv
import multiprocessing as mp
import timeit


def npv_zcb(bnd_file, delimiter=','):
    """
    Michal Mackanic
    06/05/2019 v1.0

    Load bond positions from a .csv file, value the bonds and save results
    back to a .csv file.

    inputs:
        bnd_file: str
            full path to a .csv file with bond positions
        delimiter: str
            delimiter to be used in .csv file
    outputs:
        a .csv file with additional field npv.

    dependencies:

    example:
        npv_zcb('C:\\temp\\bnd_aux.csv', ',')
    """

    # load the input file as a dataframe
    bnd_info = pd.read_csv(bnd_file,
                           sep=delimiter,
                           quoting=2,  # csv.QUOTE_NONNUMERIC
                           doublequote=True,
                           low_memory=False)

    # convert dataframe into list of dictionaries
    bnd_info = bnd_info.to_dict(orient='records')

    # get number of bonds in the file
    bnds_no = len(bnd_info)

    # go bond by bond
    for bnd_idx in range(bnds_no):
        mat = bnd_info[bnd_idx]['maturity']
        nom = bnd_info[bnd_idx]['nominal']
        yld = bnd_info[bnd_idx]['yld']
        bnd_info[bnd_idx]['npv'] = nom / ((1 + yld) ** mat)

    # covert list of dictionaries back to dataframe and save it as .csv file
    bnd_info = pd.DataFrame(bnd_info)
    bnd_info.to_csv(bnd_file,
                    sep=delimiter,
                    quoting=csv.QUOTE_NONNUMERIC,
                    quotechar='"',
                    index=False)

    return


def main(cores_no, bnds_no, path, delimiter):

    # generate random attributes of zero coupon bonds
    print('Generating random zero coupon bonds...')
    bnd_info = np.zeros([bnds_no, 3])
    bnd_info[:, 0] = np.random.randint(1, 31, size=bnds_no)
    bnd_info[:, 1] = np.random.randint(70, 151, size=bnds_no)
    bnd_info[:, 2] = np.random.randint(0, 100, size=bnds_no) / 100
    bnd_info = zip(bnd_info[:, 0], bnd_info[:, 1], bnd_info[:, 2])
    bnd_info = [{'maturity': mat,
                 'nominal': nom,
                 'yld': yld} for mat, nom, yld in bnd_info]
    bnd_info = pd.DataFrame(bnd_info)

    # save bond positions into a .csv file
    bnd_info.to_csv(path + 'bnd_aux.csv',
                    sep=delimiter,
                    quoting=csv.QUOTE_NONNUMERIC,
                    quotechar='"',
                    index=False)

    # prepare one .csv file per core
    print('Preparing input files...')

    idx = list(range(0, bnds_no, int(bnds_no / cores_no)))
    idx.append(bnds_no + 1)

    for core_idx in range(cores_no):
        # save bond positions into a .csv file
        file_name = path + 'bnd_aux_' + str(core_idx) + '.csv'
        bnd_info_aux = bnd_info[idx[core_idx]: idx[core_idx + 1]]
        bnd_info_aux.to_csv(file_name,
                            sep=delimiter,
                            quoting=csv.QUOTE_NONNUMERIC,
                            quotechar='"',
                            index=False)

    # SINGLE CORE
    print('Running single core...')

    start = timeit.default_timer()

    # evaluate bond positions
    npv_zcb(path + 'bnd_aux.csv', delimiter)

    print('   elapsed time: ', timeit.default_timer() - start, ' seconds')

    # MULTIPLE CORES
    if __name__ == '__main__':

        # spread calculation among several cores
        print('Running multiprocessing...')
        print('   ', cores_no, ' core(s)...')

        start = timeit.default_timer()

        processes = []

        # go core by core
        print('        spreading calculation among processes...')
        for core_idx in range(cores_no):
            # run calculations
            file_name = path + 'bnd_aux_' + str(core_idx) + '.csv'
            process = mp.Process(target=npv_zcb,
                                 args=(file_name, delimiter))
            processes.append(process)
            process.start()

        # wait till every process is finished
        print('        waiting for all processes to finish...')
        for process in processes:
            process.join()

    print('   elapsed time: ', timeit.default_timer() - start, ' seconds')

main(cores_no=2,
     bnds_no=1000000,
     path='/home/macky/',
     delimiter=',')
0 голосов
/ 08 февраля 2019

Это не отвечает на ваш вопрос - я только публикую его, чтобы проиллюстрировать то, что я сказал в комментариях о том, когда многопроцессорная обработка может ускорить обработку.

В приведенном ниже коде, основанном наВаш, я добавил REPEAT константу, которая заставляет npv_zcb() выполнять вычисления снова и снова, чтобы имитировать использование процессора больше.Изменение значения этой константы обычно замедляет или ускоряет одноядерную обработку гораздо больше, чем многопроцессорную часть - фактически она практически не влияет на последнюю.

import numpy as np
import multiprocessing as mp
import timeit


np.random.seed(42)  # Generate same set of random numbers for testing.

REPEAT = 10  # Number of times to repeat computations performed in npv_zcb.


def npv_zcb(bnd_info, queue):

    npvs = []

    for _ in range(REPEAT):  # To simulate more computations.

        for bnd_idx in range(len(bnd_info)):

            nom = bnd_info[bnd_idx][0]
            mat = bnd_info[bnd_idx][1]
            yld = bnd_info[bnd_idx][2]
            v = nom / ((1 + yld) ** mat)

    npvs.append(v)

    if queue:
        queue.put(npvs)
    else:
        return npvs


if __name__ == '__main__':

    print('Generating random zero coupon bonds...')
    print()

    bnds_no = 100
    cores_no = 4

    # generate random attributes of zero coupon bonds

    bnd_info = np.zeros([bnds_no, 3])
    bnd_info[:, 0] = np.random.randint(1, 31, size=bnds_no)
    bnd_info[:, 1] = np.random.randint(70, 151, size=bnds_no)
    bnd_info[:, 2] = np.random.randint(0, 100, size=bnds_no) / 100
    bnd_info = bnd_info.tolist()

    # single core
    print('Running single core...')
    start = timeit.default_timer()
    npvs = npv_zcb(bnd_info, None)
    print('   elapsed time: {:.6f} seconds'.format(timeit.default_timer() - start))

    # multiprocessing
    print()
    print('Running multiprocessing...')
    print('  ', cores_no, ' core(s)...')
    start = timeit.default_timer()

    queue = mp.Queue()
    processes = []

    idx = list(range(0, bnds_no, int(bnds_no / cores_no)))
    idx.append(bnds_no + 1)

    for core_idx in range(cores_no):
        input_data = bnd_info[idx[core_idx]: idx[core_idx + 1]]

        process = mp.Process(target=npv_zcb, args=(input_data, queue))
        processes.append(process)
        process.start()

    for process in processes:
        process.join()

    mylist = []
    while not queue.empty():
        mylist.extend(queue.get())

    print('   elapsed time: {:.6f} seconds'.format(timeit.default_timer() - start))
0 голосов
/ 07 февраля 2019

ОК, поэтому я удалил части, связанные с очередью, из кода, чтобы посмотреть, избавится ли от BrokenPipeError (выше я обновил исходный код, указав, что следует закомментировать).К сожалению, это не помогло.

Я тестировал код на своем персональном компьютере с Linux (Ubuntu 18.10, python 3.6.7).Удивительно, но код ведет себя по-разному на двух системах.В Linux версия без очереди работает без проблем;версия с очередью работает вечно.В Windows нет никакой разницы - я всегда получаю BrokenPipeError.

PS: в каком-то другом посте ( Нет многопроцессорной печати (Spyder) ) я обнаружил, что могут быть некоторыепроблема с многопроцессорностью при использовании редактора Spyder.Я испытал точно такую ​​же проблему на машине Windows.Так что не все примеры в официальной документации работают должным образом ...

0 голосов
/ 07 февраля 2019

BrokenPipeError не из-за большего ввода, но из-за состояния гонки, которое возникает из-за использования queue.empty() и queue.get() в отдельных шагах.

Вы его не видитепри меньшем количестве входных данных в большинстве случаев это происходит потому, что элементы очереди обрабатываются довольно быстро, и состояние гонки не возникает, но при использовании больших наборов данных шансы на состояние гонки возрастают.

Даже при меньших входах попробуйте запустить сценарий несколько раз,может быть 10 15 раз, и вы увидите BrokenPipeError происходит.

Одним из решений этой проблемы является передача значения часового значения в очередь, которое вы можете использовать для проверки того, были ли обработаны все данные в очереди.

Попробуйте изменить код так:

q = mp.Queue()
 <put the data in the queue>
 q.put(None)


while True:
    data = q.get()
    if data is not None:
        <process the data here >
    else:
        q.put(None)
        return
0 голосов
/ 07 февраля 2019

Это не дает прямого ответа на ваш вопрос, но если вы использовали RxPy для реактивного программирования на Python, вы можете проверить их небольшой пример по многопроцессорности: https://github.com/ReactiveX/RxPY/tree/release/v1.6.x#concurrency

Кажется, немного проще управлять параллелизмом с ReactiveX /RxPy, чем пытаться сделать это вручную.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...