Мне удалось заставить мой код работать благодаря @ SıddıkAçıl.Модель производителя-потребителя - вот что делает трюк.Я также понял, что процессы могут работать успешно, но если кто-то не сохраняет окончательные результаты в «очереди результатов», то он исчезает.Под этим я подразумеваю, что я заполнил значения в моих пустых массивах val_max и arg_max, позволив процессу запускаться (), но когда я их вызывал, они все еще были массивами np.zero.Я проверил, что они заполнили правильные массивы, напечатав их как раз в тот момент, когда процесс собирается завершиться (наконец, self.T в итерации).Поэтому вместо того, чтобы печатать их, я просто добавил их в многопроцессорный объект Queue на последней итерации, чтобы захватить весь заполненный массив.
Я предоставляю мой обновленный рабочий код ниже.ПРИМЕЧАНИЕ: он работает, но занимает вдвое больше времени, чем серийная версия.Я думаю о том, почему это может быть так:
- Я могу заставить его работать как два процесса, но на самом деле не знаю, как это сделать правильно.Опытные программисты могут знать, как это исправить с помощью параметра chunksize.
- Два процесса, которые я разделяю, - это операции с пустыми матрицами.Эти процессы уже выполняются настолько быстро, что издержки параллелизма (многопроцессорности) не стоят теоретического улучшения.Если бы эти два процесса были первыми для циклов (как они используются в Википедии и большинстве реализаций), тогда многопроцессорность могла бы дать выигрыш (возможно, мне следует изучить это).Более того, поскольку у нас есть шаблон «производитель-потребитель», а не два независимых процесса (шаблон «производитель-производитель»), мы можем ожидать, что шаблон «производитель-потребитель» будет работать до тех пор, пока самый длинный из двух процессов (в этом случае производитель занимает в два раза больше времени).пока потребитель).Мы не можем ожидать, что время выполнения сократится вдвое, как в сценарии производитель-производитель (это произошло с моим параллельным алгоритмом фильтрации HMM вперед-назад).
- Мой компьютер имеет 4 ядра, и numpy уже выполняет встроенную многопроцессорную оптимизацию ЦП.на его операции.Пытаясь использовать ядра для ускорения кода, я лишаю множество ядер, которые он мог бы использовать более эффективно.Чтобы выяснить это, я собираюсь рассчитать время выполнения простых операций и посмотреть, не будут ли они медленнее в моей параллельной версии по сравнению с моей последовательной версией.
Я обновлю, если узнаю что-то новое.Если вы, возможно, знаете реальную причину, по которой мой параллельный код намного медленнее, пожалуйста, дайте мне знать.Вот код:
import numpy as np
from time import time
import multiprocessing as mp
class Viterbi:
def __init__(self,A,B,pi):
self.M = A.shape[0] # number of hidden states
self.A = A # Transition Matrix
self.B = B # Observation Matrix
self.pi = pi # Initial distribution
self.T = None # time horizon
self.val_max = None
self.arg_max = None
self.obs = None
self.intermediate = mp.Queue()
self.result = mp.Queue()
def get_path(self,x):
'''Sequential/Serial Viterbi Algorithm with backtracking'''
self.T = len(x)
self.val_max = np.zeros((self.T, self.M))
self.arg_max = np.zeros((self.T, self.M))
self.val_max[0] = self.pi*self.B[:,x[0]]
for t in range(1, self.T):
# Indepedent Process
self.val_max[t] = np.max( self.A*np.outer(self.val_max[t-1],self.B[:,obs[t]]) , axis = 0 )
# Dependent Process
self.arg_max[t] = np.argmax( self.val_max[t-1]*self.A.T, axis = 1)
# BACKTRACK
states = np.zeros(self.T, dtype=np.int32)
states[self.T-1] = np.argmax(self.val_max[self.T-1])
for t in range(self.T-2, -1, -1):
states[t] = self.arg_max[t+1, states[t+1]]
return states
def get_val(self,intial_val_max):
'''Independent Poducer Process'''
val_max = intial_val_max
for t in range(1,self.T):
val_max = np.max( self.A*np.outer(val_max,self.B[:,self.obs[t]]) , axis = 0 )
#print('Transfer: ',self.val_max[t])
self.intermediate.put(val_max)
if t == self.T-1:
self.result.put(val_max) # we only need the last val_max value for backtracking
def get_arg(self):
'''Dependent Consumer Process.'''
t = 1
while t < self.T:
val_max =self.intermediate.get()
#print('Receive: ',val_max)
self.arg_max[t] = np.argmax( val_max*self.A.T, axis = 1)
if t == self.T-1:
self.result.put(self.arg_max)
#print('Processed: ',self.arg_max[t])
t += 1
def get_path_parallel(self,x):
'''Multiprocessing producer-consumer implementation of Viterbi algorithm.'''
self.obs = x
self.T = len(obs)
self.arg_max = np.zeros((self.T, self.M)) # we don't tabulate val_max anymore
initial_val_max = self.pi*self.B[:,obs[0]]
producer_process = mp.Process(target=self.get_val,args=(initial_val_max,),daemon=True)
consumer_process = mp.Process(target=self.get_arg,daemon=True)
self.intermediate.put(initial_val_max) # initial production put into pipeline for consumption
consumer_process.start() # we can already consume initial_val_max
producer_process.start()
#val_process.join()
#arg_process.join()
#self.output.join()
return self.backtrack(self.result.get(),self.result.get()) # backtrack takes last row of val_max and entire arg_max
def backtrack(self,val_max_last_row,arg_max):
'''Backtracking the Dynamic Programming solution (actually a Trellis diagram)
produced by Multiprocessing Viterbi algorithm.'''
states = np.zeros(self.T, dtype=np.int32)
states[self.T-1] = np.argmax(val_max_last_row)
for t in range(self.T-2, -1, -1):
states[t] = arg_max[t+1, states[t+1]]
return states
if __name__ == '__main__':
obs = np.array([0,1,2]) # normal then cold and finally dizzy
T = 100000
obs = np.random.binomial(2,0.3,T)
pi = np.array([0.6,0.4])
A = np.array([[0.7,0.3],
[0.4,0.6]])
B = np.array([[0.5,0.4,0.1],
[0.1,0.3,0.6]])
t1 = time()
viterbi = Viterbi(A,B,pi)
path = viterbi.get_path(obs)
t2 = time()
print('Iterative Viterbi')
print('Path: ',path)
print('Run-time: ',round(t2-t1,6))
t1 = time()
viterbi = Viterbi(A,B,pi)
path = viterbi.get_path_parallel(obs)
t2 = time()
print('\nParallel Viterbi')
print('Path: ',path)
print('Run-time: ',round(t2-t1,6))