После некоторой помощи со стороны коллеги мне удалось создать простой фрагмент кода, который фактически выполнялся, как и ожидалось.Я был почти там - мой код нуждался в нескольких тонких (но решающих) модификациях.Чтобы запустить код, откройте приглашение anaconda, введите python -m idlelib
, откройте файл и запустите его.
import pandas as pd
import numpy as np
import csv
import multiprocessing as mp
import timeit
def npv_zcb(core_idx, bnd_file, delimiter=','):
"""
Michal Mackanic
06/05/2019 v1.0
Load bond positions from a .csv file, value the bonds and save results
back to a .csv file.
inputs:
bnd_file: str
full path to a .csv file with bond positions
delimiter: str
delimiter to be used in .csv file
outputs:
a .csv file with additional field npv.
dependencies:
example:
npv_zcb('C:\\temp\\bnd_aux.csv', ',')
"""
# core idx
print(' npv_zcb() starting on core ' + str(core_idx))
# load the input file as a dataframe
bnd_info = pd.read_csv(bnd_file,
sep=delimiter,
quoting=2, # csv.QUOTE_NONNUMERIC
header=0,
doublequote=True,
low_memory=False)
# convert dataframe into list of dictionaries
bnd_info = bnd_info.to_dict(orient='records')
# get number of bonds in the file
bnds_no = len(bnd_info)
# go bond by bond
for bnd_idx in range(bnds_no):
mat = bnd_info[bnd_idx]['maturity']
nom = bnd_info[bnd_idx]['nominal']
yld = bnd_info[bnd_idx]['yld']
bnd_info[bnd_idx]['npv'] = nom / ((1 + yld) ** mat)
# covert list of dictionaries back to dataframe and save it as .csv file
bnd_info = pd.DataFrame(bnd_info)
bnd_info.to_csv(bnd_file,
sep=delimiter,
quoting=csv.QUOTE_NONNUMERIC,
quotechar='"',
index=False)
# core idx
print(' npv_zcb() finished on core ' + str(core_idx))
# everything OK
return True
def main(cores_no, bnds_no, path, delimiter):
if __name__ == '__main__':
mp.freeze_support()
# generate random attributes of zero coupon bonds
print('Generating random zero coupon bonds...')
bnd_info = np.zeros([bnds_no, 3])
bnd_info[:, 0] = np.random.randint(1, 31, size=bnds_no)
bnd_info[:, 1] = np.random.randint(70, 151, size=bnds_no)
bnd_info[:, 2] = np.random.randint(0, 100, size=bnds_no) / 100
bnd_info = zip(bnd_info[:, 0], bnd_info[:, 1], bnd_info[:, 2])
bnd_info = [{'maturity': mat,
'nominal': nom,
'yld': yld} for mat, nom, yld in bnd_info]
bnd_info = pd.DataFrame(bnd_info)
# save bond positions into a .csv file
bnd_info.to_csv(path + 'bnd_aux.csv',
sep=delimiter,
quoting=csv.QUOTE_NONNUMERIC,
quotechar='"',
index=False)
# prepare one .csv file per core
print('Preparing input files...')
idx = list(range(0, bnds_no, int(bnds_no / cores_no)))
idx.append(bnds_no + 1)
for core_idx in range(cores_no):
# save bond positions into a .csv file
file_name = path + 'bnd_aux_' + str(core_idx) + '.csv'
bnd_info_aux = bnd_info[idx[core_idx]: idx[core_idx + 1]]
bnd_info_aux.to_csv(file_name,
sep=delimiter,
quoting=csv.QUOTE_NONNUMERIC,
quotechar='"',
index=False)
# SINGLE CORE
print('Running single core...')
start = timeit.default_timer()
# evaluate bond positions
npv_zcb(1, path + 'bnd_aux.csv', delimiter)
print(' elapsed time: ', timeit.default_timer() - start, ' seconds')
# MULTIPLE CORES
# spread calculation among several cores
print('Running multiprocessing...')
print(' ', cores_no, ' core(s)...')
start = timeit.default_timer()
processes = []
# go core by core
print(' spreading calculation among processes...')
for core_idx in range(cores_no):
# run calculations
file_name = path + 'bnd_aux_' + str(core_idx) + '.csv'
process = mp.Process(target=npv_zcb,
args=(core_idx, file_name, delimiter))
processes.append(process)
process.start()
# wait till every process is finished
print(' waiting for all processes to finish...')
for process in processes:
process.join()
print(' elapsed time: ', timeit.default_timer() - start, ' seconds')
main(cores_no=2,
bnds_no=1000000,
path='C:\\temp\\',
delimiter=',')