Демультиплексирование значений из массива numpy - PullRequest
1 голос
/ 09 июля 2020

Устройство отправляет массив мультиплексированных кодов ошибок. Вы можете рассматривать мультиплексированные коды ошибок как своего рода кольцевой буфер FIFO, который может иметь переменную длину в зависимости от количества одновременно активных кодов ошибки. Я использую sh для демультиплексирования кодов ошибок в отдельные логические массивы.

Я ищу эффективный способ (т.е. удаление for l oop) для реализации следующего кода:

import numpy as np

def get_error_vector(error_mux_vector, error_id, period):
    index = np.where(error_mux_vector == error_id)[0]

    error_vector = np.zeros(np.size(error_mux_vector))

    for i in range(0, np.size(index) - 1):
        if (index[i + 1] - index[i]) <= 1 / period:
            error_vector[index[i]:index[i + 1] + 1] = 1

    return error_vector

Вот фиктивные значения для иллюстрации проблемы. 0 означает отсутствие ошибки; 1, 2 и 3 - коды ошибок. Мы предполагаем, что сигнал ошибки имеет частоту 5 Гц (период 0,2 с):

import matplotlib.pyplot as plt

error_signal = np.array([0,0,0,0,0,1,2,3,1,2,3,2,3,2,3,0,0,0,0,0,1,1,1,2,3,1,3,1,3,1,3,0,0,0,0,0,2,0,2,2])

error_vector_1 = get_error_vector(error_signal, 1, 0.2)
error_vector_2 = get_error_vector(error_signal, 2, 0.2)
error_vector_3 = get_error_vector(error_signal, 3, 0.2)

plt.plot(error_signal)
plt.plot(error_vector_1)
plt.plot(error_vector_2)
plt.plot(error_vector_3)
plt.legend(['array', 'error 1', 'error 2', 'error 3'])
plt.show()

Вывод сюжета

Фактические данные устройства могут содержать от 50 тыс. До 10 млн точек с примерно 100 возможными кодами ошибок. Это означает, что a for l oop действительно неэффективен для данного варианта использования. Я хотел бы улучшить этот код, но пока не нашел эффективного решения.

1 Ответ

0 голосов
/ 10 июля 2020

Вот векторизованный подход, который создает все векторы в одном go. Поставляется в двух вариантах. В моем случайном тестовом примере второй быстрее, но это может зависеть от точной статистики вашего сигнала.

import numpy as np

# dense strat
def demultiplex(signal,maxdist):
    n = signal.max()
    aux = np.zeros((n,len(signal)+1),np.int16)
    nz = signal.nonzero()[0]
    signal = signal[nz]
    idx = signal.argsort(kind="stable")
    valid = ((nz[idx[1:]]<=nz[idx[:-1]]+maxdist)&
             (signal[idx[1:]]==signal[idx[:-1]])).nonzero()[0]
    aux[signal[idx[valid]]-1,nz[idx[valid]]] = 1
    aux[signal[idx[valid+1]]-1,nz[idx[valid+1]]+1] -= 1
    out = (aux[:,:-1].cumsum(1) > 0).view(np.int8)
    return out

# sparse strat
def demultiplex2(signal,maxdist):
    n = signal.max()
    m = signal.size
    nz = signal.nonzero()[0]
    signal = signal[nz]
    idx = signal.argsort(kind="stable")
    delta = nz[idx[1:]] - nz[idx[:-1]]
    valid = ((delta<=maxdist)&(signal[idx[1:]]==signal[idx[:-1]])).nonzero()[0]
    delta = delta[valid]
    nz = nz[idx[valid]]
    nz[1:] -= nz[:-1] + delta[:-1]
    offsets = (delta+1).cumsum()
    x = np.ones(offsets[-1],int)
    x[0] = nz[0]
    x[offsets[:-1]] = nz[1:]
    out = np.zeros((n,m),np.uint8)
    out[(signal[idx[valid]]-1).repeat(delta+1),x.cumsum()] = 1
    return out

# OP
def get_error_vector(error_mux_vector, error_id, period):
    index = np.where(error_mux_vector == error_id)[0]
    error_vector = np.zeros(np.size(error_mux_vector),np.int8)
    for i in range(0, np.size(index) - 1):
        if (index[i + 1] - index[i]) <= 1 / period:
            error_vector[index[i]:index[i + 1] + 1] = 1
    return error_vector


#error_signal = np.array([0,0,0,0,0,1,2,3,1,2,3,2,3,2,3,0,0,0,0,0,1,1,1,2,3,1,3,1,3,1,3,0,0,0,0,0,2,0,2,2])
error_signal = np.random.randint(0,101,1000000)

import time

t=[]
t.append(time.time())
error_vector_1 = get_error_vector(error_signal, 1, 0.02)
error_vector_2 = get_error_vector(error_signal, 2, 0.02)
error_vector_3 = get_error_vector(error_signal, 3, 0.02)
t.append(time.time())
sol = demultiplex(error_signal,50)
t.append(time.time())
sol2 = demultiplex2(error_signal,50)
t.append(time.time())
print("time per error id [OP, pp, pp2]",np.diff(t)/(3,100,100))
print("results equal",end=" ")
print((error_vector_1==sol[0]).all(),end=" ")
print((error_vector_2==sol[1]).all(),end=" ")
print((error_vector_3==sol[2]).all(),end=" ")
print((error_vector_1==sol2[0]).all(),end=" ")
print((error_vector_2==sol2[1]).all(),end=" ")
print((error_vector_3==sol2[2]).all())

Пример выполнения:

time per error id [OP, pp, pp2] [0.02730425 0.00912964 0.00440736]
results equal True True True True True True

Объяснение:

  • мы сортируем сигнал, чтобы легко идентифицировать те коды ошибок, которые следуют сами за собой в достаточно близкое время.
  • мы располагаем векторы ошибок в стеке так, чтобы точки, которые должны быть установлены, могли быть адресованы с помощью сигнала координат [t], t
  • , чтобы установить отрезки временных точек, мы устанавливаем первый равным 1, а следующий за последним - равным -1 и формируем итоговую сумму - чтобы исправить перекрывающиеся отрезки, мы проверяем> 0 и приводим полученное логическое значение в int
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...