Обратный вызов on_message () завершается ошибкой при попытке применить полезную нагрузку сообщения в модели TensorFlow - PullRequest
0 голосов
/ 20 июня 2020
• 1000 1004 *

Вот мой подписчик:

import paho.mqtt.client as mqtt
from models.gain_ import GAIN
from train_data_loader import data_loader
import pandas as pd

model = GAIN()
model.train(data_loader("025a_ready_", 0.2)[0])


def on_message(client, userdata, message):
    print("entering on message")
    # parameters = []

    imputed_data = model.impute(pd.read_pickle(message.payload))
    print(imputed_data)
    print(message.payload)
    print(message.topic)
    print(str(message.qos))
    print(type(message.payload))


def main():

    client = mqtt.Client()
    client.on_message = on_message
    client.connect(host="localhost", port=1883)

    client.subscribe("approx-comp", qos=1)
    client.loop_forever()
    #


if __name__ == '__main__':
    main()

Это модель:

import numpy as np

from tqdm import tqdm
from utils import normalization, xavier_init, renormalization, rounding, sample_batch_index, uniform_sampler, \
    binary_sampler
import tensorflow.compat.v1 as tf


class GAIN:

    def __init__(self, batch_size=25, hint_rate=0.9, alpha=0.5, iterations=100):
        self.sess = tf.Session()
        self.g_sample = None
        self.batch_size = batch_size
        self.hint_rate = hint_rate
        self.alpha = alpha
        self.iterations = iterations
        print("init hyperparams")

    def train(self, data_x):
        print("starting training")
        data_m = 1 - np.isnan(data_x)
        no, dim = data_x.shape
        h_dim = int(dim)
        norm_data, norm_parameters = normalization(data_x)
        norm_data_x = np.nan_to_num(norm_data, 0)
        print("initialized params")

        # GAIN architecture
        # Input placeholders
        # Data vector
        x = tf.placeholder(tf.float32, shape=[None, dim])
        # Mask vector
        m = tf.placeholder(tf.float32, shape=[None, dim])
        # Hint vector
        h = tf.placeholder(tf.float32, shape=[None, dim])

        # Discriminator variables
        d_w1 = tf.Variable(xavier_init([dim * 2, h_dim]))  # Data + Hint as inputs
        d_b1 = tf.Variable(tf.zeros(shape=[h_dim]))

        d_w2 = tf.Variable(xavier_init([h_dim, h_dim]))
        d_b2 = tf.Variable(tf.zeros(shape=[h_dim]))

        d_w3 = tf.Variable(xavier_init([h_dim, dim]))
        d_b3 = tf.Variable(tf.zeros(shape=[dim]))  # Multi-variate outputs

        theta_d = [d_w1, d_w2, d_w3, d_b1, d_b2, d_b3]

        print("init discriminator params")

        # Generator variables
        # Data + Mask as inputs (Random noise is in missing components)
        g_w1 = tf.Variable(xavier_init([dim * 2, h_dim]))
        g_b1 = tf.Variable(tf.zeros(shape=[h_dim]))

        g_w2 = tf.Variable(xavier_init([h_dim, h_dim]))
        g_b2 = tf.Variable(tf.zeros(shape=[h_dim]))

        g_w3 = tf.Variable(xavier_init([h_dim, dim]))
        g_b3 = tf.Variable(tf.zeros(shape=[dim]))

        theta_g = [g_w1, g_w2, g_w3, g_b1, g_b2, g_b3]

        print("init generator params")

        # GAIN functions
        # Generator
        def generator():
            # Concatenate Mask and Data
            inputs = tf.concat(values=[x, m], axis=1)
            g_h1 = tf.nn.relu(tf.matmul(inputs, g_w1) + g_b1)
            g_h2 = tf.nn.relu(tf.matmul(g_h1, g_w2) + g_b2)
            # MinMax normalized output
            g_prob = tf.nn.sigmoid(tf.matmul(g_h2, g_w3) + g_b3)
            return g_prob

        # Discriminator
        def discriminator():
            # Concatenate Data and Hint
            inputs = tf.concat(values=[x, h], axis=1)
            d_h1 = tf.nn.relu(tf.matmul(inputs, d_w1) + d_b1)
            d_h2 = tf.nn.relu(tf.matmul(d_h1, d_w2) + d_b2)
            d_logit = tf.matmul(d_h2, d_w3) + d_b3
            d_prob = tf.nn.sigmoid(d_logit)
            return d_prob

        # GAIN structure
        # Generator
        g_sample = generator()

        # Combine with observed data
        hat_x = x * m + g_sample * (1 - m)

        # Discriminator
        d_prob = discriminator()

        # GAIN loss
        d_loss_temp = -tf.reduce_mean(m * tf.log(d_prob + 1e-8)
                                      + (1 - m) * tf.log(1. - d_prob + 1e-8))

        g_loss_temp = -tf.reduce_mean((1 - m) * tf.log(d_prob + 1e-8))

        mse_loss = \
            tf.reduce_mean((m * x - m * g_sample) ** 2) / tf.reduce_mean(m)

        d_loss = d_loss_temp
        g_loss = g_loss_temp + self.alpha * mse_loss

        # GAIN solver
        d_solver = tf.train.AdamOptimizer().minimize(d_loss, var_list=theta_d)
        g_solver = tf.train.AdamOptimizer().minimize(g_loss, var_list=theta_g)

        # Iterations
        self.sess.run(tf.global_variables_initializer())

        print("starting iterations")

        for it in tqdm(range(self.iterations)):
            # Sample batch
            batch_idx = sample_batch_index(no, self.batch_size)
            x_mb = norm_data[batch_idx, :]
            m_mb = data_m[batch_idx, :]
            # Sample random vectors
            z_mb = uniform_sampler(0, 0.01, self.batch_size, dim)
            # Sample hint vectors
            h_mb_temp = binary_sampler(self.hint_rate, self.batch_size, dim)
            h_mb = m_mb * h_mb_temp

            # Combine random vectors with observed vectors
            x_mb = m_mb * x_mb + (1 - m_mb) * z_mb

            _, d_loss_curr = self.sess.run([d_solver, d_loss_temp],
                                           feed_dict={m: m_mb, x: x_mb, h: h_mb})
            _, g_loss_curr, mse_loss_curr = \
                self.sess.run([g_solver, g_loss_temp, mse_loss],
                              feed_dict={x: x_mb, m: m_mb, h: h_mb})

            print("done training")

    def impute(self, data_x):

        print("in impute")

        norm_data, norm_parameters = normalization(data_x)

        data_m = 1 - np.iszero(data_x)

        no, dim = data_x.shape

        x = tf.placeholder(tf.float32, shape=[None, dim])
        # Mask vector
        m = tf.placeholder(tf.float32, shape=[None, dim])

        # Return imputed data
        z_mb = uniform_sampler(0, 0.01, no, dim)
        m_mb = data_m
        x_mb = norm_data
        x_mb = m_mb * x_mb + (1 - m_mb) * z_mb

        imputed_data = self.sess.run([self.g_sample], feed_dict={x: x_mb, m: m_mb})[0]

        imputed_data = data_m * norm_data + (1 - data_m) * imputed_data

        # Renormalization
        imputed_data = renormalization(imputed_data, norm_parameters)

        # Rounding
        imputed_data = rounding(imputed_data, data_x)

        return imputed_data

Это мой файл utils:

"""Utility functions for GAIN.

(1) normalization: MinMax Normalizer
(2) renormalization: Recover the data from normalized data
(3) rounding: Handle categorical variables after imputation
(4) rmse_loss: Evaluate imputed data in terms of RMSE
(5) xavier_init: Xavier initialization
(6) binary_sampler: sample binary random variables
(7) uniform_sampler: sample uniform random variables
(8) sample_batch_index: sample random batch index
"""

# Necessary packages
import numpy as np
import tensorflow as tf


def normalization(data):
    """Normalize data in [0, 1] range.

    Args:
      - data: original data

    Returns:
      - norm_data: normalized data
      - norm_parameters: min_val, max_val for each feature for renormalization
    """

    # Parameters
    _, dim = data.shape
    norm_data = data.copy()

    # MixMax normalization
    min_val = np.zeros(dim)
    max_val = np.zeros(dim)

    # For each dimension
    for i in range(dim):
        min_val[i] = np.nanmin(norm_data[:, i])
        norm_data[:, i] = norm_data[:, i] - np.nanmin(norm_data[:, i])
        max_val[i] = np.nanmax(norm_data[:, i])
        norm_data[:, i] = norm_data[:, i] / (np.nanmax(norm_data[:, i]) + 1e-6)

        # Return norm_parameters for renormalization
    norm_parameters = {'min_val': min_val,
                       'max_val': max_val}

    return norm_data, norm_parameters


def renormalization(norm_data, norm_parameters):
    """Renormalize data from [0, 1] range to the original range.

    Args:
  - norm_data: normalized data
  - norm_parameters: min_val, max_val for each feature for renormalization

   Returns:
  - renorm_data: renormalized original data
    """

    min_val = norm_parameters['min_val']
    max_val = norm_parameters['max_val']

    _, dim = norm_data.shape
    renorm_data = norm_data.copy()

    for i in range(dim):
        renorm_data[:, i] = renorm_data[:, i] * (max_val[i] + 1e-6)
        renorm_data[:, i] = renorm_data[:, i] + min_val[i]

    return renorm_data


def rounding(imputed_data, data_x):
    """Round imputed data for categorical variables.

  Args:
    - imputed_data: imputed data
    - data_x: original data with missing values

  Returns:
    - rounded_data: rounded imputed data
  """

    _, dim = data_x.shape
    rounded_data = imputed_data.copy()

    for i in range(dim):
        temp = data_x[~np.isnan(data_x[:, i]), i]
        # Only for the categorical variable
        if len(np.unique(temp)) < 20:
            rounded_data[:, i] = np.round(rounded_data[:, i])

    return rounded_data


def rmse_loss(ori_data, imputed_data, data_m):
    """Compute RMSE loss between ori_data and imputed_data

  Args:
    - ori_data: original data without missing values
    - imputed_data: imputed data
    - data_m: indicator matrix for missingness

  Returns:
    - rmse: Root Mean Squared Error
  """

    ori_data, _ = normalization(ori_data)
    imputed_data, _ = normalization(imputed_data)

    # Only for missing values
    nominator = np.sum(((1 - data_m) * ori_data - (1 - data_m) * imputed_data) ** 2)
    denominator = np.sum(1 - data_m)

    rmse = np.sqrt(nominator / float(denominator))

    return rmse


def xavier_init(size):
    """Xavier initialization.

  Args:
    - size: vector size

  Returns:
    - initialized random vector.
  """
    in_dim = size[0]
    xavier_stddev = 1. / tf.sqrt(in_dim / 2.)
    return tf.random.normal(shape=size, stddev=xavier_stddev)


def binary_sampler(p, rows, cols):
    """Sample binary random variables.

  Args:
    - p: probability of 1
    - rows: the number of rows
    - cols: the number of columns

  Returns:
    - binary_random_matrix: generated binary random matrix.
  """
    unif_random_matrix = np.random.uniform(0., 1., size=[rows, cols])
    binary_random_matrix = 1 * (unif_random_matrix < p)
    return binary_random_matrix


def uniform_sampler(low, high, rows, cols):
    """Sample uniform random variables.

  Args:
    - low: low limit
    - high: high limit
    - rows: the number of rows
    - cols: the number of columns

  Returns:
    - uniform_random_matrix: generated uniform random matrix.
  """
    return np.random.uniform(low, high, size=[rows, cols])


def sample_batch_index(total, batch_size):
    """Sample index of the mini-batch.

  Args:
    - total: total number of samples
    - batch_size: batch size

  Returns:
    - batch_idx: batch index
  """
    total_idx = np.random.permutation(total)
    batch_idx = total_idx[:batch_size]
    return batch_idx

Это my data_loader:


import numpy as np
from utils import binary_sampler
from keras.datasets import mnist


def data_loader(data_name, miss_rate):
  '''Loads datasets and introduce missingness.
  
  Args:
    - data_name: letter, spam, or mnist
    - miss_rate: the probability of missing components
    
  Returns:
    data_x: original data
    miss_data_x: data with missing values
    data_m: indicator matrix for missing components
  '''
  
  # Load data
  if data_name in ['letter', 'spam', '025a_ready_']:
    file_name = data_name+'.csv'
    data_x = np.loadtxt(file_name, delimiter=",", skiprows=1)
  elif data_name == 'mnist':
    (data_x, _), _ = mnist.load_data()
    data_x = np.reshape(np.asarray(data_x), [60000, 28*28]).astype(float)

  # Parameters
  no, dim = data_x.shape
  
  # Introduce missing data
  data_m = binary_sampler(1-miss_rate, no, dim)
  miss_data_x = data_x.copy()
  miss_data_x[data_m == 0] = np.nan
      
  return data_x, miss_data_x, data_m

В любом случае, после получения сообщения и при печати "ввод сообщения" функция on_message () просто пропускает все, что осталось в обратном вызове. Может ли кто-нибудь объяснить мне, что идет не так, или как я могу отладить ошибку, из-за которой обратный вызов пропускает код? Спасибо.

...