Векторизованная 2D-интерполяция - PullRequest
4 голосов
/ 15 октября 2019

Я использую функцию интерполяции 2D SciPy (scipy.interpolate.interp2d). У меня есть поле, которое хранится как Z = [x, y, p1, p2]. Я хотел бы интерполировать в каждой точке x и y, используя 2D матрицу [x, y, :, :]. Форма Z равна (40, 40, 6, 6).

Единственный способ сделать это - использовать цикл for и переходить через каждую точку. Ниже приведен код, который я использую для этого. a и b - это массивы, имеющие форму (6,) и (6,).

for i in range(40):
    for j in range(40):
        ff = interpolate.interp2d(a, b, z[i,j,:,:]) 
        zi[i,j] = ff(atest,btest)

Есть ли способ сделать это без использования цикла for?

1 Ответ

1 голос
/ 23 октября 2019

Поскольку вы, по сути, интерполируете кучу разных функций по одним и тем же 2d точкам, разумно, что нет встроенного способа сделать это. Хотя некоторые вычисления можно сэкономить, поскольку для каждой функции у вас есть одна и та же двумерная сетка, лежащая в основе интерполяции, поэтому ожидание также невозможно. В любом случае, я не мог найти встроенный способ сделать это без зацикливания.

К счастью, с помощью некоторой коленной смазки мы можем реализовать билинейный интерполятор (так же, как петлевой)которые используют векторизацию для работы на всех точках одновременно. Вот класс, который реализует это:

import numpy as np 

class VectorizedBilinearInterpolator: 
    def __init__(self, x0, y0, z0): 
        """Create a bilinear interpolator for gridded input data 

        Inputs: 
            x0: shape (ngridx,)
            y0: shape (ngridy,)
            z0: shape batch_shape + (ngridy, ngridx) (viewed as batches) 
        """

        if z0.shape[-2:] != y0.shape + x0.shape: 
            raise ValueError("The last two dimensions of z0 must match that of y0 and x0, respectively!") 

        ind_x = np.argsort(x0) 
        self.x0 = x0[ind_x] 

        ind_y = np.argsort(y0) 
        self.y0 = y0[ind_y] 

        self.batch_shape = z0.shape[:-2] 
        indexer = np.ix_(ind_y, ind_x) 
        self.z0 = z0[..., indexer[0], indexer[1]].reshape(-1, y0.size, x0.size)  # shape (nbatch, ngridy, ngridx)

        # compute auxiliary coefficients for interpolation 
        # we have ngridx-1 boxes along x and ngridy-1 boxes along y
        # for each box we need 4 coefficients for bilinear interpolation
        # see e.g. https://en.wikipedia.org/wiki/Bilinear_interpolation#Alternative_algorithm
        # construct a batch of matrices with size (ngridy-1, ngridx-1, 4, 4) to invert
        x1 = self.x0[:-1]
        x2 = self.x0[1:]
        y1 = self.y0[:-1, None]
        y2 = self.y0[1:, None]
        x1,x2,y1,y2,one = np.broadcast_arrays(x1, x2, y1, y2, 1)  # all shaped (ngridy-1, ngridx-1)

        M = np.array([[one, x1, y1, x1*y1], [one, x1, y2, x1*y2],
                      [one, x2, y1, x2*y1], [one, x2, y2, x2*y2]]).transpose(2, 3, 0, 1)  # shape (ngridy-1, ngridx-1, 4, 4)
        zvec = np.array([self.z0[:, :-1, :-1], self.z0[:, 1:, :-1], self.z0[:, :-1, 1:], self.z0[:, 1:, 1:]])  # shape (4, nbatch, ngridy-1, ngridx-1)

        self.coeffs = np.einsum('yxab,bnyx -> nyxa', np.linalg.inv(M), zvec)  # shape (nbatch, ngridy-1, ngridx-1, 4) for "a0,a1,a2,a3" coefficients
        # for a given box (i,j) the interpolated value is given by self.coeffs[:,i,j,:] @ [1, x, y, x*y]

    def __call__(self, x, y): 
        """Evaluate the interpolator at the given coordinates 

        Inputs: 
            x: shape (noutx,) 
            y: shape (nouty,) 

        Output: 
            z: shape batch_shape + (nouty, noutx) (see __init__) 
        """

        # identify outliers (and mask at the end)
        out_x = (x < self.x0[0]) | (self.x0[-1] < x)
        out_y = (y < self.y0[0]) | (self.y0[-1] < y)

        # clip outliers, mask later
        xbox = (self.x0.searchsorted(x) - 1).clip(0, self.x0.size - 2)  # shape (noutx,) indices
        ybox = (self.y0.searchsorted(y) - 1).clip(0, self.y0.size - 2)  # shape (nouty,) indices
        indexer = np.ix_(ybox, xbox)

        xgrid,ygrid = np.meshgrid(x, y)  # both shape (nouty, noutx)

        coeffs_now = self.coeffs[:, indexer[0], indexer[1], :]  # shape (nbatch, nouty, noutx, 4)
        poly = np.array([np.ones_like(xgrid), xgrid, ygrid, xgrid*ygrid])  # shape (4, nouty, noutx)
        values =  np.einsum('nyxa,ayx -> nyx', coeffs_now, poly)  # shape (nbatch, nouty, noutx)

        # reshape final result and mask outliers
        z = values.reshape(self.batch_shape + xgrid.shape)
        z[..., out_y, :] = np.nan
        z[..., :, out_x] = np.nan

        return z

И вот несколько тестов:

from scipy import interpolate  # only needed for the loopy comparison

# input points
x0 = np.linspace(0, 1, 6)
y0 = np.linspace(0, 1, 7)
z0 = np.random.rand(40, 41, y0.size, x0.size)

# output points
xtest = np.linspace(0, 1, 20)
ytest = np.linspace(0, 1, 21)

# the original loopy version
def loopy():
    zi = np.empty(z0.shape[:2] + (ytest.size, xtest.size))
    for i in range(z0.shape[0]):
        for j in range(z0.shape[1]):
            ff = interpolate.interp2d(x0, y0, z0[i,j,:,:])
            zi[i,j] = ff(xtest, ytest)
    return zi

# the new, vectorized version
def vector():
    f = VectorizedBilinearInterpolator(x0, y0, z0)
    zi = f(xtest, ytest)
    return zi

Во-первых, мы должны убедиться, что два интерполятора делают одно и то же:

>>> np.allclose(loopy(), vector())
True

Во-вторых, мы можем рассчитать этот фиктивный пример, используя магию ipython %timeit из-за лени:

>>> %timeit loopy()
... %timeit vector()
216 ms ± 2.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
25.9 ms ± 1.08 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Так что, по крайней мере, в этом небольшом примере векторизованная версия работает в 8 раз быстрее. Фактическое ускорение сильно зависит от размера вашего реального примера. Хотя я попытался правильно реализовать выбросы (т. Е. Установить выбросы на np.nan вместо экстраполяции глупостей), я не проверил тщательно все граничные случаи, поэтому убедитесь, что вы протестировали реализацию на реальных данных (сравнивая цикл сновый интерполятор для нескольких типичных случаев).


Если ваш вопрос содержит реальную проблему, то есть окончательный массив формы (40,40,6,6), то ни одна из этих работ не стоит вашего времени (учитываяочень короткое время выполнения), и вы должны просто использовать цикл.

...