Вот векторизованный подход, который создает все векторы в одном go. Поставляется в двух вариантах. В моем случайном тестовом примере второй быстрее, но это может зависеть от точной статистики вашего сигнала.
import numpy as np
# dense strat
def demultiplex(signal,maxdist):
n = signal.max()
aux = np.zeros((n,len(signal)+1),np.int16)
nz = signal.nonzero()[0]
signal = signal[nz]
idx = signal.argsort(kind="stable")
valid = ((nz[idx[1:]]<=nz[idx[:-1]]+maxdist)&
(signal[idx[1:]]==signal[idx[:-1]])).nonzero()[0]
aux[signal[idx[valid]]-1,nz[idx[valid]]] = 1
aux[signal[idx[valid+1]]-1,nz[idx[valid+1]]+1] -= 1
out = (aux[:,:-1].cumsum(1) > 0).view(np.int8)
return out
# sparse strat
def demultiplex2(signal,maxdist):
n = signal.max()
m = signal.size
nz = signal.nonzero()[0]
signal = signal[nz]
idx = signal.argsort(kind="stable")
delta = nz[idx[1:]] - nz[idx[:-1]]
valid = ((delta<=maxdist)&(signal[idx[1:]]==signal[idx[:-1]])).nonzero()[0]
delta = delta[valid]
nz = nz[idx[valid]]
nz[1:] -= nz[:-1] + delta[:-1]
offsets = (delta+1).cumsum()
x = np.ones(offsets[-1],int)
x[0] = nz[0]
x[offsets[:-1]] = nz[1:]
out = np.zeros((n,m),np.uint8)
out[(signal[idx[valid]]-1).repeat(delta+1),x.cumsum()] = 1
return out
# OP
def get_error_vector(error_mux_vector, error_id, period):
index = np.where(error_mux_vector == error_id)[0]
error_vector = np.zeros(np.size(error_mux_vector),np.int8)
for i in range(0, np.size(index) - 1):
if (index[i + 1] - index[i]) <= 1 / period:
error_vector[index[i]:index[i + 1] + 1] = 1
return error_vector
#error_signal = np.array([0,0,0,0,0,1,2,3,1,2,3,2,3,2,3,0,0,0,0,0,1,1,1,2,3,1,3,1,3,1,3,0,0,0,0,0,2,0,2,2])
error_signal = np.random.randint(0,101,1000000)
import time
t=[]
t.append(time.time())
error_vector_1 = get_error_vector(error_signal, 1, 0.02)
error_vector_2 = get_error_vector(error_signal, 2, 0.02)
error_vector_3 = get_error_vector(error_signal, 3, 0.02)
t.append(time.time())
sol = demultiplex(error_signal,50)
t.append(time.time())
sol2 = demultiplex2(error_signal,50)
t.append(time.time())
print("time per error id [OP, pp, pp2]",np.diff(t)/(3,100,100))
print("results equal",end=" ")
print((error_vector_1==sol[0]).all(),end=" ")
print((error_vector_2==sol[1]).all(),end=" ")
print((error_vector_3==sol[2]).all(),end=" ")
print((error_vector_1==sol2[0]).all(),end=" ")
print((error_vector_2==sol2[1]).all(),end=" ")
print((error_vector_3==sol2[2]).all())
Пример выполнения:
time per error id [OP, pp, pp2] [0.02730425 0.00912964 0.00440736]
results equal True True True True True True
Объяснение:
- мы сортируем сигнал, чтобы легко идентифицировать те коды ошибок, которые следуют сами за собой в достаточно близкое время.
- мы располагаем векторы ошибок в стеке так, чтобы точки, которые должны быть установлены, могли быть адресованы с помощью сигнала координат [t], t
- , чтобы установить отрезки временных точек, мы устанавливаем первый равным 1, а следующий за последним - равным -1 и формируем итоговую сумму - чтобы исправить перекрывающиеся отрезки, мы проверяем> 0 и приводим полученное логическое значение в int