• 1000 1004 *
Вот мой подписчик:
import paho.mqtt.client as mqtt
from models.gain_ import GAIN
from train_data_loader import data_loader
import pandas as pd
model = GAIN()
model.train(data_loader("025a_ready_", 0.2)[0])
def on_message(client, userdata, message):
print("entering on message")
# parameters = []
imputed_data = model.impute(pd.read_pickle(message.payload))
print(imputed_data)
print(message.payload)
print(message.topic)
print(str(message.qos))
print(type(message.payload))
def main():
client = mqtt.Client()
client.on_message = on_message
client.connect(host="localhost", port=1883)
client.subscribe("approx-comp", qos=1)
client.loop_forever()
#
if __name__ == '__main__':
main()
Это модель:
import numpy as np
from tqdm import tqdm
from utils import normalization, xavier_init, renormalization, rounding, sample_batch_index, uniform_sampler, \
binary_sampler
import tensorflow.compat.v1 as tf
class GAIN:
def __init__(self, batch_size=25, hint_rate=0.9, alpha=0.5, iterations=100):
self.sess = tf.Session()
self.g_sample = None
self.batch_size = batch_size
self.hint_rate = hint_rate
self.alpha = alpha
self.iterations = iterations
print("init hyperparams")
def train(self, data_x):
print("starting training")
data_m = 1 - np.isnan(data_x)
no, dim = data_x.shape
h_dim = int(dim)
norm_data, norm_parameters = normalization(data_x)
norm_data_x = np.nan_to_num(norm_data, 0)
print("initialized params")
# GAIN architecture
# Input placeholders
# Data vector
x = tf.placeholder(tf.float32, shape=[None, dim])
# Mask vector
m = tf.placeholder(tf.float32, shape=[None, dim])
# Hint vector
h = tf.placeholder(tf.float32, shape=[None, dim])
# Discriminator variables
d_w1 = tf.Variable(xavier_init([dim * 2, h_dim])) # Data + Hint as inputs
d_b1 = tf.Variable(tf.zeros(shape=[h_dim]))
d_w2 = tf.Variable(xavier_init([h_dim, h_dim]))
d_b2 = tf.Variable(tf.zeros(shape=[h_dim]))
d_w3 = tf.Variable(xavier_init([h_dim, dim]))
d_b3 = tf.Variable(tf.zeros(shape=[dim])) # Multi-variate outputs
theta_d = [d_w1, d_w2, d_w3, d_b1, d_b2, d_b3]
print("init discriminator params")
# Generator variables
# Data + Mask as inputs (Random noise is in missing components)
g_w1 = tf.Variable(xavier_init([dim * 2, h_dim]))
g_b1 = tf.Variable(tf.zeros(shape=[h_dim]))
g_w2 = tf.Variable(xavier_init([h_dim, h_dim]))
g_b2 = tf.Variable(tf.zeros(shape=[h_dim]))
g_w3 = tf.Variable(xavier_init([h_dim, dim]))
g_b3 = tf.Variable(tf.zeros(shape=[dim]))
theta_g = [g_w1, g_w2, g_w3, g_b1, g_b2, g_b3]
print("init generator params")
# GAIN functions
# Generator
def generator():
# Concatenate Mask and Data
inputs = tf.concat(values=[x, m], axis=1)
g_h1 = tf.nn.relu(tf.matmul(inputs, g_w1) + g_b1)
g_h2 = tf.nn.relu(tf.matmul(g_h1, g_w2) + g_b2)
# MinMax normalized output
g_prob = tf.nn.sigmoid(tf.matmul(g_h2, g_w3) + g_b3)
return g_prob
# Discriminator
def discriminator():
# Concatenate Data and Hint
inputs = tf.concat(values=[x, h], axis=1)
d_h1 = tf.nn.relu(tf.matmul(inputs, d_w1) + d_b1)
d_h2 = tf.nn.relu(tf.matmul(d_h1, d_w2) + d_b2)
d_logit = tf.matmul(d_h2, d_w3) + d_b3
d_prob = tf.nn.sigmoid(d_logit)
return d_prob
# GAIN structure
# Generator
g_sample = generator()
# Combine with observed data
hat_x = x * m + g_sample * (1 - m)
# Discriminator
d_prob = discriminator()
# GAIN loss
d_loss_temp = -tf.reduce_mean(m * tf.log(d_prob + 1e-8)
+ (1 - m) * tf.log(1. - d_prob + 1e-8))
g_loss_temp = -tf.reduce_mean((1 - m) * tf.log(d_prob + 1e-8))
mse_loss = \
tf.reduce_mean((m * x - m * g_sample) ** 2) / tf.reduce_mean(m)
d_loss = d_loss_temp
g_loss = g_loss_temp + self.alpha * mse_loss
# GAIN solver
d_solver = tf.train.AdamOptimizer().minimize(d_loss, var_list=theta_d)
g_solver = tf.train.AdamOptimizer().minimize(g_loss, var_list=theta_g)
# Iterations
self.sess.run(tf.global_variables_initializer())
print("starting iterations")
for it in tqdm(range(self.iterations)):
# Sample batch
batch_idx = sample_batch_index(no, self.batch_size)
x_mb = norm_data[batch_idx, :]
m_mb = data_m[batch_idx, :]
# Sample random vectors
z_mb = uniform_sampler(0, 0.01, self.batch_size, dim)
# Sample hint vectors
h_mb_temp = binary_sampler(self.hint_rate, self.batch_size, dim)
h_mb = m_mb * h_mb_temp
# Combine random vectors with observed vectors
x_mb = m_mb * x_mb + (1 - m_mb) * z_mb
_, d_loss_curr = self.sess.run([d_solver, d_loss_temp],
feed_dict={m: m_mb, x: x_mb, h: h_mb})
_, g_loss_curr, mse_loss_curr = \
self.sess.run([g_solver, g_loss_temp, mse_loss],
feed_dict={x: x_mb, m: m_mb, h: h_mb})
print("done training")
def impute(self, data_x):
print("in impute")
norm_data, norm_parameters = normalization(data_x)
data_m = 1 - np.iszero(data_x)
no, dim = data_x.shape
x = tf.placeholder(tf.float32, shape=[None, dim])
# Mask vector
m = tf.placeholder(tf.float32, shape=[None, dim])
# Return imputed data
z_mb = uniform_sampler(0, 0.01, no, dim)
m_mb = data_m
x_mb = norm_data
x_mb = m_mb * x_mb + (1 - m_mb) * z_mb
imputed_data = self.sess.run([self.g_sample], feed_dict={x: x_mb, m: m_mb})[0]
imputed_data = data_m * norm_data + (1 - data_m) * imputed_data
# Renormalization
imputed_data = renormalization(imputed_data, norm_parameters)
# Rounding
imputed_data = rounding(imputed_data, data_x)
return imputed_data
Это мой файл utils:
"""Utility functions for GAIN.
(1) normalization: MinMax Normalizer
(2) renormalization: Recover the data from normalized data
(3) rounding: Handle categorical variables after imputation
(4) rmse_loss: Evaluate imputed data in terms of RMSE
(5) xavier_init: Xavier initialization
(6) binary_sampler: sample binary random variables
(7) uniform_sampler: sample uniform random variables
(8) sample_batch_index: sample random batch index
"""
# Necessary packages
import numpy as np
import tensorflow as tf
def normalization(data):
"""Normalize data in [0, 1] range.
Args:
- data: original data
Returns:
- norm_data: normalized data
- norm_parameters: min_val, max_val for each feature for renormalization
"""
# Parameters
_, dim = data.shape
norm_data = data.copy()
# MixMax normalization
min_val = np.zeros(dim)
max_val = np.zeros(dim)
# For each dimension
for i in range(dim):
min_val[i] = np.nanmin(norm_data[:, i])
norm_data[:, i] = norm_data[:, i] - np.nanmin(norm_data[:, i])
max_val[i] = np.nanmax(norm_data[:, i])
norm_data[:, i] = norm_data[:, i] / (np.nanmax(norm_data[:, i]) + 1e-6)
# Return norm_parameters for renormalization
norm_parameters = {'min_val': min_val,
'max_val': max_val}
return norm_data, norm_parameters
def renormalization(norm_data, norm_parameters):
"""Renormalize data from [0, 1] range to the original range.
Args:
- norm_data: normalized data
- norm_parameters: min_val, max_val for each feature for renormalization
Returns:
- renorm_data: renormalized original data
"""
min_val = norm_parameters['min_val']
max_val = norm_parameters['max_val']
_, dim = norm_data.shape
renorm_data = norm_data.copy()
for i in range(dim):
renorm_data[:, i] = renorm_data[:, i] * (max_val[i] + 1e-6)
renorm_data[:, i] = renorm_data[:, i] + min_val[i]
return renorm_data
def rounding(imputed_data, data_x):
"""Round imputed data for categorical variables.
Args:
- imputed_data: imputed data
- data_x: original data with missing values
Returns:
- rounded_data: rounded imputed data
"""
_, dim = data_x.shape
rounded_data = imputed_data.copy()
for i in range(dim):
temp = data_x[~np.isnan(data_x[:, i]), i]
# Only for the categorical variable
if len(np.unique(temp)) < 20:
rounded_data[:, i] = np.round(rounded_data[:, i])
return rounded_data
def rmse_loss(ori_data, imputed_data, data_m):
"""Compute RMSE loss between ori_data and imputed_data
Args:
- ori_data: original data without missing values
- imputed_data: imputed data
- data_m: indicator matrix for missingness
Returns:
- rmse: Root Mean Squared Error
"""
ori_data, _ = normalization(ori_data)
imputed_data, _ = normalization(imputed_data)
# Only for missing values
nominator = np.sum(((1 - data_m) * ori_data - (1 - data_m) * imputed_data) ** 2)
denominator = np.sum(1 - data_m)
rmse = np.sqrt(nominator / float(denominator))
return rmse
def xavier_init(size):
"""Xavier initialization.
Args:
- size: vector size
Returns:
- initialized random vector.
"""
in_dim = size[0]
xavier_stddev = 1. / tf.sqrt(in_dim / 2.)
return tf.random.normal(shape=size, stddev=xavier_stddev)
def binary_sampler(p, rows, cols):
"""Sample binary random variables.
Args:
- p: probability of 1
- rows: the number of rows
- cols: the number of columns
Returns:
- binary_random_matrix: generated binary random matrix.
"""
unif_random_matrix = np.random.uniform(0., 1., size=[rows, cols])
binary_random_matrix = 1 * (unif_random_matrix < p)
return binary_random_matrix
def uniform_sampler(low, high, rows, cols):
"""Sample uniform random variables.
Args:
- low: low limit
- high: high limit
- rows: the number of rows
- cols: the number of columns
Returns:
- uniform_random_matrix: generated uniform random matrix.
"""
return np.random.uniform(low, high, size=[rows, cols])
def sample_batch_index(total, batch_size):
"""Sample index of the mini-batch.
Args:
- total: total number of samples
- batch_size: batch size
Returns:
- batch_idx: batch index
"""
total_idx = np.random.permutation(total)
batch_idx = total_idx[:batch_size]
return batch_idx
Это my data_loader:
import numpy as np
from utils import binary_sampler
from keras.datasets import mnist
def data_loader(data_name, miss_rate):
'''Loads datasets and introduce missingness.
Args:
- data_name: letter, spam, or mnist
- miss_rate: the probability of missing components
Returns:
data_x: original data
miss_data_x: data with missing values
data_m: indicator matrix for missing components
'''
# Load data
if data_name in ['letter', 'spam', '025a_ready_']:
file_name = data_name+'.csv'
data_x = np.loadtxt(file_name, delimiter=",", skiprows=1)
elif data_name == 'mnist':
(data_x, _), _ = mnist.load_data()
data_x = np.reshape(np.asarray(data_x), [60000, 28*28]).astype(float)
# Parameters
no, dim = data_x.shape
# Introduce missing data
data_m = binary_sampler(1-miss_rate, no, dim)
miss_data_x = data_x.copy()
miss_data_x[data_m == 0] = np.nan
return data_x, miss_data_x, data_m
В любом случае, после получения сообщения и при печати "ввод сообщения" функция on_message () просто пропускает все, что осталось в обратном вызове. Может ли кто-нибудь объяснить мне, что идет не так, или как я могу отладить ошибку, из-за которой обратный вызов пропускает код? Спасибо.