Я пытаюсь реализовать простой EM-алгоритм для оценки параметров нормально распределенной случайной выборки, и хотя оценочные средние параметры действительно сходятся близко к истинному среднему параметру, дисперсия неверна. Я считаю, что мои расчеты параметров на шаге максимизации верны, но я не вижу нигде другого источника проблемы.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
def generate_data(n_samples, distribution_generator, distribution_params):
data = np.array([])
for param_set in distribution_params:
data_subset = distribution_generator(**param_set, size=n_samples)
data = np.hstack([data, data_subset])
return data
def pdf(x, loc, scale):
s1 = 1/(np.sqrt(2*np.pi*scale))
s2 = np.exp(-(np.square(x - loc)/(2*scale)))
return s1 * s2
def expectation_step(init_params, data, pdf, prior):
posterior_table = pd.DataFrame()
posterior_table['x'] = data
for j in range(len(prior)):
posterior_table[f'resp_dist{j}'] = np.nan
for i, xi in enumerate(data):
evidence = 0
for j, dist_params in enumerate(init_params):
evidence += pdf(**dist_params, x=xi)*prior[j]
for j, dist_params in enumerate(init_params):
prob_of_kj = pdf(**dist_params, x=xi)*prior[j]
posterior = prob_of_kj / evidence
posterior_table.iloc[i, j+1] = posterior
return posterior_table
def maximization_step(posterior_table, current_params):
updated_prior = []
for index, dist_params in enumerate(current_params):
#calculate new params using responsibilities table
new_mu_1 = sum(posterior_table['x'] * posterior_table[f'resp_dist{index}'])/sum(posterior_table[f'resp_dist{index}'])
new_cov_1 = sum(posterior_table[f'resp_dist{index}'].values * (posterior_table['x'] - new_mu_1).values * (posterior_table['x'] - new_mu_1).values.T) / sum(posterior_table[f'resp_dist{index}'])
dist_params['loc'] = new_mu_1
dist_params['scale'] = new_cov_1
updated_prior.append(posterior_table[f'resp_dist{index}'].sum() / posterior_table[[col for col in posterior_table.columns if 'resp' in col]].values.sum())
return current_params, updated_prior
def iterate_through_steps(current_params, data, pdf, prior, n_iter=10):
new_params = current_params
for step in range(n_iter):
print(new_params, '\n')
posterior_table = expectation_step(new_params, data, pdf, prior)
new_params, prior = maximization_step(posterior_table, new_params)
plot_current_params(data, new_params)
return new_params
def plot_current_params(data, current_params):
bins = np.linspace(np.min(data),np.max(data),100)
for params in current_params:
plt.plot(bins, pdf(x=bins, **params), color='gray', alpha=0.1)
plt.pause(0.05)
n_samples = 100
n_iter = 100
distribution_params = [{'loc':5.5, 'scale': 2.}]
distribution_generator = np.random.normal
data = generate_data(n_samples, distribution_generator, distribution_params)
init_params = [{'loc': 10.0, 'scale': 4.}]
prior = np.ones(len(distribution_params))/len(distribution_params)
# visualize the training data
bins = np.linspace(np.min(data),np.max(data),n_samples)
plt.figure(figsize=(10,7))
plt.xlabel("$x$")
plt.ylabel("pdf")
plt.scatter(data, [0.005] * len(data), color='navy', s=10, marker=2, alpha=0.5, label="Train data")
for params in distribution_params:
plt.plot(bins, pdf(x=bins, **params), color='red', label='True distribution')
plt.legend()
iterate_through_steps(init_params, data, pdf, prior, n_iter)