NumPy: быстрое регулярное среднее значение для большого количества отрезков / точек - PullRequest
0 голосов
/ 13 сентября 2018

У меня есть много (~ 1 миллион) нерегулярно расположенных точек P вдоль 1D линии.Они отмечают сегменты линии так, что если точки являются {0, x_a, x_b, x_c, x_d, ...}, сегменты переходят из 0-> x_a, x_a-> x_b, x_b-> x_c, x_c-> x_d и т. д. У меня также есть значение y для каждого сегмента, которое я хочу интерпретировать как глубину цвета.Мне нужно нарисовать эту линию в виде изображения, но может быть доступно, скажем, 1000 пикселей для представления всей длины линии.Эти пиксели, конечно, соответствуют регулярно расположенным интервалам вдоль линии, скажем, в 0..X1, X1..X2, X2..X3 и т. Д., Где X1, X2, X3 регулярно расположены.Чтобы определить цвет для каждого пикселя, мне нужно взять среднее значение всех значений y, которые попадают в правильно расположенные пиксельные границы, взвешенные по длине сегмента, попадающего в этот интервал.Также могут быть пиксели, которые не содержат никакого значения в P и которые просто принимают значение цвета, определенное сегментом, проходящим через весь пиксель.

Это похоже на то, что, вероятно, нужно сделатьмного в анализе изображений.Так есть ли название для этой операции, и какой самый быстрый способ в numpy вычислить такой равномерно распределенный набор средних значений y?Полагаю, это немного похоже на интерполяцию, только я не хочу брать среднее значение только для двух окружающих точек, а средневзвешенное значение для всех точек в пределах регулярного интервала (плюс немного перекрытия).

[Edit - добавлен минимальный пример]

Так, есть 5 сегментов вдоль горизонтальной линии, разделенных [0, 1.1, 2.2, 2.3, 2.8, 4] (то есть линия идет от 0 до 4),Предположим, что каждый из сегментов принимает произвольные значения затенения, например, мы могли бы иметь 5 значений затенения [0,0.88,0.55,0.11,0.44] - где 0 - черный, а 1 - белый.Тогда, если бы я хотел построить это с использованием 4 пикселей, мне нужно было бы создать 4 значения, от 0 ... 1, 1 ... 2 и т. Д., И ожидать, что вычисление будет возвращать следующие значения для каждого:

0 ... 1 = 0 (это охватывается первым отрезком линии, 0-> 1.1)

1 ... 2 = 0,1 * 0 + 0,9 * 0,88 (1 ... 1,1покрыта первым отрезком линии, остальное вторым)

2 ... 3 = 0,2 * 0,88, 0,1 * 0,55 + 0,5 * 0,11 + 0,2 * 0,44 (это покрывается вторым допятые отрезки)

3 ... 4 = 0,44 (это охватывается последним отрезком линии, 2,8-> 4)

Принимая во внимание, что если бы я хотел поместить эти данные в 2-длинной в пиксель 2 пикселя будут иметь следующие значения:

0 ... 2 = 1,1 / 2 * 0 + 0,9 / 2 * 0,88

2 ... 4 = 0,2/ 2 * 0,88 + 0,1 / 2 * 0,55 + 0,5 / 2 * 0,11 + 1,2 * 0,44

Это похоже на "правильный" способ сделать даунсамплинг вдоль 1-й линии.Я ищу быструю реализацию (в идеале что-то встроенное), когда у меня есть (скажем) миллион точек вдоль линии и всего 1000 (или около того) пикселей, чтобы их уместить.

Ответы [ 3 ]

0 голосов
/ 13 сентября 2018

Вы все еще можете сделать это с помощью линейной интерполяции.Хотя ваша функция кусочно-постоянна, вы хотите, чтобы ее среднее значение за целый ряд небольших интервалов.Среднее значение некоторой функции f ( x ) за интервал от a до b представляет собой только ее интеграл в этом диапазоне, деленный наразница между a и b .Интеграл кусочно-постоянной функции будет кусочно-линейной функцией.Итак, предположим, что у вас есть ваши данные:

x = [0, 1.1, 2.2, 2.3, 2.8, 4]
y = [0, 0.88, 0.55, 0.11, 0.44]

Создайте функцию, которая даст свой интеграл при любом значении x .Здесь массив I будет содержать значение интеграла для каждого из заданных вами значений x , а функция f является ее линейным интерполантом, который даст точное значение в любой точке:

I = numpy.zeros_like(x)
I[1:] = numpy.cumsum(numpy.diff(x) * y)
f = scipy.interpolate.interp1d(x, I)

Теперь оценить среднее значение для каждого из ваших пикселей очень просто:

pix_x = numpy.linspace(0, 4, 5)
pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])

Мы можем проверить, что находится в этих массивах:

>>> pix_x
array([0., 1., 2., 3., 4.])
>>> pix_y
array([0.   , 0.792, 0.374, 0.44 ])

Значения затенения дляваши пиксели теперь в pix_y.Они должны точно соответствовать значениям, указанным выше в вашем примере.

Это должно быть достаточно быстро даже для многих, многих точек:

def test(x, y):
    I = numpy.zeros_like(x)
    I[1:] = numpy.cumsum(numpy.diff(x) * y)
    f = scipy.interpolate.interp1d(x, I,
        bounds_error=False, fill_value=(0, I[-1]))
    pix_x = numpy.linspace(0, 1, 1001)
    pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])
    return pix_y

timeit отчетов:

225 ms ± 37.6 ms per loop

в моей системе, когда x имеет размер 1000000 (и y имеет размер 999999).Обратите внимание, что bounds_error=False и fill_value=(0, I[-1]) передаются в interp1d.Это предполагает, что ваша функция затенения равна нулю вне диапазона значений x .Кроме того, interp1d не требует сортировки входных значений;В приведенном выше тесте я дал значения x и y как массивы равномерных случайных чисел от 0 до 1. Однако, если вы точно знаете, что они отсортированы, вы можете передать assume_sorted=True и вы должны получить увеличение скорости:

20.2 ms ± 377 µs per loop
0 голосов
/ 13 сентября 2018

Как и следовало ожидать, для этой проблемы есть чисто клочковатое решение. Хитрость заключается в том, чтобы хитро смешать np.searchsorted, который поместит вашу обычную сетку в ближайший лоток оригинала, и np.add.reduceat для вычисления сумм лотков:

import numpy as np

def distribute(x, y, n):
    """
    Down-samples/interpolates the y-values of each segment across a
    domain with `n` points. `x` represents segment endpoints, so should
    have one more element than `y`. 
    """
    y = np.asanyarray(y)
    x = np.asanyarray(x)

    new_x = np.linspace(x[0], x[-1], n + 1)
    # Find the insertion indices
    locs = np.searchsorted(x, new_x)[1:]
    # create a matrix of indices
    indices = np.zeros(2 * n, dtype=np.int)
    # Fill it in
    dloc = locs[:-1] - 1
    indices[2::2] = dloc
    indices[1::2] = locs

    # This is the sum of every original segment a new segment touches
    weighted = np.append(y * np.diff(x), 0)
    sums = np.add.reduceat(weighted, indices)[::2]

    # Now subtract the adjusted portions from the right end of the sums
    sums[:-1] -= (x[dloc + 1] - new_x[1:-1]) * y[dloc]
    # Now do the same for the left of each interval
    sums[1:] -= (new_x[1:-1] - x[dloc]) * y[dloc]

    return new_x, sums / np.diff(new_x)


seg = [0, 1.1, 2.2, 2.3, 2.8, 4]
color = [0, 0.88, 0.55, 0.11, 0.44]

seg, color = distribute(seg, color, 4)
print(seg, color)

Результат

[0. 1. 2. 3. 4.] [0.    0.792 0.374 0.44 ]

Это именно то, что вы ожидали в своих ручных вычислениях.

Ориентиры

Я запустил следующий набор тестов, чтобы убедиться, что и решение EE_ , и мое согласились с ответом, и проверить время. Я немного изменил другое решение, чтобы иметь тот же интерфейс, что и у меня:

from scipy.interpolate import interp1d

def EE_(x, y, n):
    I = np.zeros_like(x)
    I[1:] = np.cumsum(np.diff(x) * y)
    f = interp1d(x, I, bounds_error=False, fill_value=(0, I[-1]))
    pix_x = np.linspace(x[0], x[-1], n + 1)
    pix_y = (f(pix_x[1:]) - f(pix_x[:-1])) / (pix_x[1:] - pix_x[:-1])
    return pix_x, pix_y

А вот и тестовый стенд (метод MadPhysicist только что переименован из функции distribute выше). Ввод всегда составляет 1001 элемент для x и 1000 элементов для y. Выходные числа 5, 10, 100, 1000, 10000:

np.random.seed(0x1234ABCD)

x = np.cumsum(np.random.gamma(3.0, 0.2, size=1001))
y = np.random.uniform(0.0, 1.0, size=1000)

tests = (
    MadPhysicist,
    EE_,
)

for n in (5, 10, 100, 1000, 10000):
    print(f'N = {n}')
    results = {test.__name__: test(x, y, n) for test in tests}

    for name, (x_out, y_out) in results.items():
        print(f'{name}:\n\tx = {x_out}\n\ty = {y_out}')

    allsame = np.array([[np.allclose(x1, x2) and np.allclose(y1, y2)
                         for x2, y2 in results.values()]
                        for x1, y1 in results.values()])
    print()
    print(f'Result Match:\n{allsame}')

    from IPython import get_ipython
    magic = get_ipython().magic

    for test in tests:
        print(f'{test.__name__}({n}):\n\t', end='')
        magic(f'timeit {test.__name__}(x, y, n)')

Я пропущу распечатки данных и соглашений (результаты идентичны) и покажу время:

N = 5
MadPhysicist: 50.6 µs ± 349 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_:           110 µs ± 568 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

N = 10
MadPhysicist: 50.5 µs ± 732 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_:           111 µs ± 635 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)   

N = 100
MadPhysicist: 54.5 µs ± 284 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_:           114 µs ± 215 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

N = 1000
MadPhysicist: 107 µs ± 5.73 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
EE_:          148 µs ± 5.11 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

N = 10000
MadPhysicist: 458 µs ± 2.21 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
EE_:          301 µs ± 4.57 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

Вы можете видеть, что при меньших выходных размерах решение NumPy намного быстрее, возможно, потому что преобладают накладные расходы. Тем не менее, при большем числе точек останова решение scipy становится намного быстрее. Вам нужно сравнить при разных входных размерах, чтобы получить реальное представление о том, как работает синхронизация, а не просто о разном выходном размере.

0 голосов
/ 13 сентября 2018

Учитывая ваше ключевое требование, что вы не хотите линейной интерполяции, вы должны взглянуть на использование scipy.signal.resample .

Что это будет делать, это преобразовать ваш сигнал в спектр, а затем в новый временной ряд, который регулярно расположен вдоль оси х.

Также смотрите этот вопрос: Как равномерно пересчитать неоднородный сигнал, используя scipy .

...