итерация по указанной оси в Cython - PullRequest
0 голосов
/ 17 сентября 2018

Я изучаю Cython, и я изменил код в учебнике , чтобы попытаться провести численное дифференцирование:

import numpy as np
cimport numpy as np
import cython
np.import_array()

def test3(a, int order=2, int axis=-1):

    cdef int i

    if axis<0:
        axis = len(a.shape) + axis

    out = np.empty(a.shape, np.double)

    cdef np.flatiter ita = np.PyArray_IterAllButAxis(a, &axis)
    cdef np.flatiter ito = np.PyArray_IterAllButAxis(out, &axis)

    cdef int a_axis_stride = a.strides[axis]
    cdef int o_axis_stride = out.strides[axis]

    cdef int axis_length = out.shape[axis]

    cdef double value


    while np.PyArray_ITER_NOTDONE(ita):
        # first element
        pt1 = <double*>((<char*>np.PyArray_ITER_DATA(ita)))
        pt2 = (<double*>((<char*>np.PyArray_ITER_DATA(ita)) + 1*a_axis_stride))
        pt3 = (<double*>((<char*>np.PyArray_ITER_DATA(ita)) + 2*a_axis_stride))
        value = -1.5*pt1[0] + 2*pt2[0] - 0.5*pt3[0]
        (<double*>((<char*>np.PyArray_ITER_DATA(ito))))[0] = value

        for i in range(axis_length-2):
            pt1 = (<double*>((<char*>np.PyArray_ITER_DATA(ita)) + i*a_axis_stride))
            pt2 = (<double*>((<char*>np.PyArray_ITER_DATA(ita)) + (i+2)*a_axis_stride))
            value = -0.5*pt1[0] + 0.5*pt2[0]
            (<double*>((<char*>np.PyArray_ITER_DATA(ito)) + (i+1)*o_axis_stride))[0] = value

        # last element
        pt1 = (<double*>((<char*>np.PyArray_ITER_DATA(ita))+ (axis_length-3)*a_axis_stride))
        pt2 = (<double*>((<char*>np.PyArray_ITER_DATA(ita))+ (axis_length-2)*a_axis_stride))
        pt3 = (<double*>((<char*>np.PyArray_ITER_DATA(ita))+ (axis_length-1)*a_axis_stride))
        value = 1.5*pt3[0] - 2*pt2[0] + 0.5*pt1[0]
        (<double*>((<char*>np.PyArray_ITER_DATA(ito))+(axis_length-1)*o_axis_stride))[0] = value


        np.PyArray_ITER_NEXT(ita)
        np.PyArray_ITER_NEXT(ito)

    return out

Код дает правильные результаты, и он действительно быстрее, чеммоя собственная реализация numpy без Cython.Проблема заключается в следующем:

  1. Я думал только о наличии одного оператора pt1 = (<double*>((<char*>np.PyArray_ITER_DATA(ita)) + i*a_axis_stride)), а затем использовал pt1[0], pt1[-1], pt1[1] для доступа к элементам массива, но этоработает только если указанная ось является последней.Если я различаю другую ось (не последнюю), то (<double*>((<char*>np.PyArray_ITER_DATA(ita)) + i*a_axis_stride)) указывает на правую, а pt[-1] и pt[1] указывают на элементы непосредственно до и после pt[0], который находится вдоль последней оси,Текущая версия работает, но если я хочу реализовать дифференцирование более высокого порядка, которое требует большего количества баллов для оценки, то в итоге у меня будет много таких строк, и я не уверен, что есть лучшие / более эффективные способы сделать это, используяpt[1] или что-то вроде pt[xxx] для доступа к соседним точкам (вдоль указанной оси).

  2. Есть ли другие способы ускорить этот фрагмент кода?Я ищу мелкие детали, которые я мог упустить из виду или тонкие вещи, которые могут иметь большой эффект.

1 Ответ

0 голосов
/ 25 сентября 2018

К моему небольшому удивлению, я не могу превзойти вашу версию, используя типизированные представления памяти на языке Cython - итераторы выглядят довольно быстро.Однако я думаю, что смогу значительно увеличить читабельность, чтобы позволить вам использовать синтаксис нарезки Python.Единственное ограничение заключается в том, что входной массив должен быть смежным с C, чтобы его можно было легко изменять (я думаю, что смежный Fortran также может работать, но я не проверял)

Основной прием - выровнять все осидо и после выбранной оси, так что это известная трехмерная фигура, в которой вы можете использовать просмотры памяти Cython.

@cython.boundscheck(False)
def test4(a,order=2,axis=-1):
    assert a.flags['C_CONTIGUOUS'] # otherwise the reshape doesn't work
    before = np.product(a.shape[:axis])
    after = np.product(a.shape[(axis+1):])
    cdef double[:,:,::1] a_new = a.reshape((before, a.shape[axis], after)) # this should not involve copying memory - it's just a new view
    cdef double[:] a_slice

    cdef double[:,:,::1] out = np.empty_like(a_new)

    assert a_new.shape[1] > 3

    cdef int m,n,i

    for m in range(a_new.shape[0]):
        for n in range(a_new.shape[2]):
            a_slice = a_new[m,:,n]

            out[m,0,n] = -1.5*a_slice[0] + 2*a_slice[1] - 0.5*a_slice[2]


            for i in range(a_slice.shape[0]-2):
                out[m,i+1,n] = -0.5*a_slice[i] + 0.5*a_slice[i+2]

            # last element
            out[m,-1,n] = 1.5*a_slice[-1] - 2*a_slice[-2] + 0.5*a_slice[-3]

    return np.asarray(out).reshape(a.shape)

Скорость, по-моему, немного медленнее, чем у вашей версии.


С точки зрения улучшения вашего кода, вы можете обработать шаг в двойных числах вместо байтов (a_axis_stride_dbl = a_axis_stride/sizeof(double)) и затем индексировать как pt[i*a_axis_stride_dbl]).Вероятно, он не наберет много скорости, но будет более читабельным.(Это то, о чем вы спрашиваете в пункте 1)

...