Вот несколько идей для начала:
Мы можем использовать тот факт, что любой начальный вектор вероятности будет сходиться в стационарном состоянии при эволюции во времени (предполагая, что это ergodi c, aperiodi c, регулярное, et c).
Для небольших матриц мы могли бы использовать
def steady_state(matrix):
dim = matrix.shape[0]
prob = np.ones(dim) / dim
other = np.zeros(dim)
while np.linalg.norm(prob - other) > 1e-3:
other = prob.copy()
prob = other @ matrix
return prob
(я думаю, что условием, предполагаемым функцией в вопросе, является распределение go в строках).
Теперь мы можем использовать тот факт, что умножение матриц и norm
может быть выполнено чанком по чанку:
def steady_state_chunk(matrix, block_in=100, block_out=10):
dim = matrix.shape[0]
prob = np.ones(dim) / dim
error = 1.
while error > 1e-3:
error = 0.
other = prob.copy()
for i in range(0, dim, block_out):
outs = np.s_[i:i+block_out]
vec_out = np.zeros(block_out)
for j in range(0, dim, block_in):
ins = np.s_[j:j+block_in]
vec_out += other[ins] @ matrix[ins, outs]
error += np.linalg.norm(vec_out - prob[outs])**2
prob[outs] = vec_out
error = np.sqrt(error)
return prob
Это должно использовать меньше памяти для временных, хотя вы могли бы добиться большего, используя out
параметр np.matmul
. Я должен добавить кое-что, чтобы иметь дело с последним срезом в каждом l oop, в случае, если dim
не делится на block_*
, но я надеюсь, что вы поняли.
Для массивов, которые не Начав с того, что уместились в памяти, вы можете применить инструменты по ссылкам в комментариях выше.