Хорошо написанный пример того, как взять более крупную задачу (то есть набор данных), разбить ее на куски и обработать по частям в numba CUDA, - здесь . В частности, интересующий вариант - pricer_cuda_overlap.py
. К сожалению, в этом примере используется то, что, как я считаю, является устаревшей функцией генерации случайных чисел в accelerate.cuda.rand
, поэтому в сегодняшней numba его нельзя запустить напрямую (я думаю).
Однако для целей данного вопроса процесс генерации случайных чисел не имеет значения, и поэтому мы можем просто удалить его, не затрагивая важные наблюдения. Далее следует один файл, собранный из разных частей в разных файлах в этом примере:
$ cat t45.py
#! /usr/bin/env python
"""
This version demonstrates copy-compute overlapping through multiple streams.
"""
from __future__ import print_function
import math
import sys
import numpy as np
from numba import cuda, jit
from math import sqrt, exp
from timeit import default_timer as timer
from collections import deque
StockPrice = 20.83
StrikePrice = 21.50
Volatility = 0.021 # per year
InterestRate = 0.20
Maturity = 5. / 12.
NumPath = 500000
NumStep = 200
def driver(pricer, pinned=False):
paths = np.zeros((NumPath, NumStep + 1), order='F')
paths[:, 0] = StockPrice
DT = Maturity / NumStep
if pinned:
from numba import cuda
with cuda.pinned(paths):
ts = timer()
pricer(paths, DT, InterestRate, Volatility)
te = timer()
else:
ts = timer()
pricer(paths, DT, InterestRate, Volatility)
te = timer()
ST = paths[:, -1]
PaidOff = np.maximum(paths[:, -1] - StrikePrice, 0)
print('Result')
fmt = '%20s: %s'
print(fmt % ('stock price', np.mean(ST)))
print(fmt % ('standard error', np.std(ST) / sqrt(NumPath)))
print(fmt % ('paid off', np.mean(PaidOff)))
optionprice = np.mean(PaidOff) * exp(-InterestRate * Maturity)
print(fmt % ('option price', optionprice))
print('Performance')
NumCompute = NumPath * NumStep
print(fmt % ('Mstep/second', '%.2f' % (NumCompute / (te - ts) / 1e6)))
print(fmt % ('time elapsed', '%.3fs' % (te - ts)))
class MM(object):
"""Memory Manager
Maintain a freelist of device memory for reuse.
"""
def __init__(self, shape, dtype, prealloc):
self.device = cuda.get_current_device()
self.freelist = deque()
self.events = {}
for i in range(prealloc):
gpumem = cuda.device_array(shape=shape, dtype=dtype)
self.freelist.append(gpumem)
self.events[gpumem] = cuda.event(timing=False)
def get(self, stream=0):
assert self.freelist
gpumem = self.freelist.popleft()
evnt = self.events[gpumem]
if not evnt.query(): # not ready?
# querying is faster then waiting
evnt.wait(stream=stream) # future works must wait
return gpumem
def free(self, gpumem, stream=0):
evnt = self.events[gpumem]
evnt.record(stream=stream)
self.freelist.append(gpumem)
if sys.version_info[0] == 2:
range = xrange
@jit('void(double[:], double[:], double, double, double, double[:])',
target='cuda')
def cu_step(last, paths, dt, c0, c1, normdist):
i = cuda.grid(1)
if i >= paths.shape[0]:
return
noise = normdist[i]
paths[i] = last[i] * math.exp(c0 * dt + c1 * noise)
def monte_carlo_pricer(paths, dt, interest, volatility):
n = paths.shape[0]
num_streams = 2
part_width = int(math.ceil(float(n) / num_streams))
partitions = [(0, part_width)]
for i in range(1, num_streams):
begin, end = partitions[i - 1]
begin, end = end, min(end + (end - begin), n)
partitions.append((begin, end))
partlens = [end - begin for begin, end in partitions]
mm = MM(shape=part_width, dtype=np.double, prealloc=10 * num_streams)
device = cuda.get_current_device()
blksz = device.MAX_THREADS_PER_BLOCK
gridszlist = [int(math.ceil(float(partlen) / blksz))
for partlen in partlens]
strmlist = [cuda.stream() for _ in range(num_streams)]
# Allocate device side array - in original example this would be initialized with random numbers
d_normlist = [cuda.device_array(partlen, dtype=np.double, stream=strm)
for partlen, strm in zip(partlens, strmlist)]
c0 = interest - 0.5 * volatility ** 2
c1 = volatility * math.sqrt(dt)
# Configure the kernel
# Similar to CUDA-C: cu_monte_carlo_pricer<<<gridsz, blksz, 0, stream>>>
steplist = [cu_step[gridsz, blksz, strm]
for gridsz, strm in zip(gridszlist, strmlist)]
d_lastlist = [cuda.to_device(paths[s:e, 0], to=mm.get(stream=strm))
for (s, e), strm in zip(partitions, strmlist)]
for j in range(1, paths.shape[1]):
d_pathslist = [cuda.to_device(paths[s:e, j], stream=strm,
to=mm.get(stream=strm))
for (s, e), strm in zip(partitions, strmlist)]
for step, args in zip(steplist, zip(d_lastlist, d_pathslist, d_normlist)):
d_last, d_paths, d_norm = args
step(d_last, d_paths, dt, c0, c1, d_norm)
for d_paths, strm, (s, e) in zip(d_pathslist, strmlist, partitions):
d_paths.copy_to_host(paths[s:e, j], stream=strm)
mm.free(d_last, stream=strm)
d_lastlist = d_pathslist
for strm in strmlist:
strm.synchronize()
if __name__ == '__main__':
driver(monte_carlo_pricer, pinned=True)
$ python t45.py
Result
stock price: 22.6720614385
standard error: 0.0
paid off: 1.17206143849
option price: 1.07834858009
Performance
Mstep/second: 336.40
time elapsed: 0.297s
$
В этом примере многое происходит, и общая тема о том, как писать конвейерный / перекрывающийся код в CUDA, сама по себе будет полным ответом, поэтому я просто расскажу об основных моментах. Общая тема хорошо освещена в этом сообщении в блоге хотя и с учетом CUDA C ++, а не numba CUDA (python). Однако существует соответствие 1: 1 между большинством объектов, представляющих интерес в numba CUDA, и их эквивалентами в CUDA C ++. Поэтому я предполагаю, что основные понятия, такие как потоки CUDA, и то, как они используются для организации асинхронной параллельной деятельности, понятны.
Так, что делает этот пример? Я сосредоточусь в основном на аспектах CUDA.
- с учетом наложения операций копирования и вычислений входные данные (
paths
) преобразуются в закрепленную память CUDA на хосте
- с целью обработки работы в блоках определен менеджер памяти (
MM
), который позволит повторно использовать выделенные фрагменты памяти устройства по мере обработки.
- списки Python создаются для представления последовательности обработки фрагментов. Существует список, который определяет начало и конец каждого чанка или раздела. Существует список, который определяет последовательность потоков cuda, которые будут использоваться. Существует список разделов массива данных, которые будет использовать ядро CUDA.
- затем, с этими списками, происходит выдача работы в «порядке глубины первого порядка». Для каждого потока данные (чанки), необходимые для этого потока, передаются на устройство (в очередь для передачи), ядро, которое будет обрабатывать эти данные, запускается (ставится в очередь), и передача, которая будет отправлять результаты из этого чанка обратно в память хоста поставлена в очередь Этот процесс повторяется в цикле
for j
в monte_carlo_pricer
для количества шагов (paths.shape[1]
).
Когда я запускаю приведенный выше код с помощью профилировщика, мы можем видеть временную шкалу, которая выглядит следующим образом:
В данном конкретном случае я запускаю его на Quadro K2000, который является старым, небольшим графическим процессором, имеющим только один механизм копирования. Поэтому в профиле мы видим, что не более одной операции копирования перекрывается с активностью ядра CUDA, и нет операций копирования, перекрывающихся с другими операциями копирования. Однако, если бы я запустил это на устройстве с двумя механизмами копирования, я ожидал бы, что возможна более плотная / плотная временная шкала, с перекрытием 2 операций копирования и операции вычисления одновременно для максимальной пропускной способности. Для этого необходимо увеличить количество используемых потоков (num_streams
) как минимум до 3.