Как создать пользовательский pyfun c, чтобы делать прогнозы, используя модель, для которой требуется форма ввода с более чем двумя измерениями, используя MLflow? - PullRequest
0 голосов
/ 26 февраля 2020

Я новичок в TensorFlow и MLFlow, и у меня есть проблема, аналогичная той, которая была задана в здесь . Я реализую модель TensorFlow для прогнозирования значений временных рядов. С этой целью я использовал MLFlow mlflow.tensorflow.autolog (), чтобы отслеживать и обслуживать модели в моем случае. Тем не менее, поскольку моя входная форма имеет более двух измерений, я не смог использовать этот метод.

Как и ранее предложил , я пытался кодировать / декодировать входные данные в предсказании, используя для этого пользовательский pyfun c.

Таким образом, у меня есть функция model_test.py с методом прогнозирования, который декодирует его входные данные:

import sys
import os
import json
import mlflow
import numpy as np
import pandas as pd
from mlflow.pyfunc import PythonModel
import tensorflow as tf
import base64



class ModelTest(PythonModel):

    def __init__(self, estimator=None,window_size = 64,batch_size = 256,shuffle_buffer_size = 100):
        # CODE TO CREATE THE EXPERIMENT
        self.window_size = window_size
        self.batch_size = batch_size
        self.shuffle_buffer_size = shuffle_buffer_size

    def windowed_dataset(self,series, window_size, batch_size, shuffle_buffer):
            series = tf.expand_dims(series, axis=-1)
            ds = tf.data.Dataset.from_tensor_slices(series)
            ds = ds.window(window_size + 1, shift=1, drop_remainder=True)
            ds = ds.flat_map(lambda w: w.batch(window_size + 1))
            ds = ds.shuffle(shuffle_buffer)
            ds = ds.map(lambda w: (w[:-1], w[1:]))
            self.windowed_ds = ds.batch(batch_size).prefetch(1)
            return self

    def train(self, train_set, y = None, epochs = 500):

        model = tf.keras.models.Sequential([
                  tf.keras.layers.Conv1D(filters=60, kernel_size=5,
                                      strides=1, padding="causal",
                                      activation="relu",
                                      input_shape=[None, 1]),
                  tf.keras.layers.LSTM(60, return_sequences=True),
                  tf.keras.layers.Dense(10, activation="relu"),
                  tf.keras.layers.Dense(1),
                  tf.keras.layers.Lambda(lambda x: x * 400)
                ])
        optimizer = tf.keras.optimizers.SGD(lr=1e-6, momentum=0.9)
        model.compile(loss=tf.keras.losses.Huber(),optimizer=optimizer,metrics=["mae"])
        model.fit(x=train_set, y=y,epochs=5)
        self.modelo = model

        return self

    def predict(self, series_encoded):
        # Decode the data that arrives to the method
        def decode_ts(x):
            return pd.Series(np.frombuffer(base64.b64decode(x)))
        series_decode = decode_ts(series_encoded)
        # Preprocess data
        series = np.expand_dims(series_decode, axis=1)
        ds = tf.data.Dataset.from_tensor_slices(series)
        # Replace the number by a variable window_size
        ds = ds.window(60, shift=1, drop_remainder=True)
        # Replace the number by a variable window_size
        ds = ds.flat_map(lambda w: w.batch(60))
        ds = ds.batch(32).prefetch(1)
        # Prediction
        forecast = self.modelo.predict(ds)

        return forecast

и run .py файл для обучения и сохранения модели:

import os
import mlflow.pyfunc
import ModelTest as model_test
import sys
import json
import mlflow
import numpy as np
from pymongo import MongoClient
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import Preprocessing as pre

#@click(...) # define the click options according to MLproject file
def run():
    # Code to load time series data from MongoDB and preprocess it

    window_size = 64
    batch_size = 256
    shuffle_buffer_size = 100
    split_time = 400

    series = np.array(data_df['sensor_ts'])
    time = np.array(data_df['time'])
    time_train = time[:split_time]
    x_train = series[:split_time]
    time_valid = time[split_time:]
    x_valid = series[split_time:]


    modelo = modelo_tercero.ModelTest()
    modelo.windowed_dataset(x_train, window_size, batch_size, shuffle_buffer_size)

    with mlflow.start_run() as run:
        model = modelo.train(modelo.windowed_ds)
        model_path = os.path.join('models', run.info.run_id)

        # Save model
        mlflow.pyfunc.save_model(
            path=model_path,
            python_model= modelo.train(modelo.windowed_ds),
            code_path=['Modelthird.py'],
            conda_env={
                'channels': ['defaults', 'conda-forge'],
                'dependencies': [
                    'mlflow=1.6.0',
                    'numpy=1.18.1',
                    'tensorflow=2.1.0',
                    'pandas=0.25.3',
                    'python=3.7.6',
                    'cloudpickle==0.5.8'
                ],
                'name': 'mlflow-env'
            }
        )


if __name__ == "__main__":
    run()

Когда я выполняю run.py, я получаю следующие ошибки, когда модель будет сохранена:

 Traceback (most recent call last):

File "run.py", line 116, in <module>
    run()
  File "run.py", line 110, in run
    'name': 'mlflow-env'
  File "/opt/conda/lib/python3.7/site-packages/mlflow/pyfunc/__init__.py", line 596, in save_model
    code_paths=code_path, mlflow_model=mlflow_model)
  File "/opt/conda/lib/python3.7/site-packages/mlflow/pyfunc/model.py", line 141, in _save_model_with_class_artifacts_params
    cloudpickle.dump(python_model, out)
  File "/opt/conda/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 1109, in dump
    CloudPickler(file, protocol=protocol).dump(obj)
  File "/opt/conda/lib/python3.7/site-packages/cloudpickle/cloudpickle.py", line 482, in dump
    return Pickler.dump(self, obj)
  File "/opt/conda/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/conda/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/opt/conda/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/opt/conda/lib/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
  File "/opt/conda/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py", line 873, in __reduce__
    return convert_to_tensor, (self._numpy(),)
  File "/opt/conda/lib/python3.7/site-packages/tensorflow_core/python/framework/ops.py", line 910, in _numpy
    six.raise_from(core._status_to_exception(e.code, e.message), None)
  File "<string>", line 3, in raise_from

Я просмотрел различную документацию, связанную с сохранением и сериализацией моделей TensorFlow, но в MLFlow не так много документации о моделях TensorFlow и пользовательских функциях pyfun c. Кто-нибудь может мне помочь или дать мне подсказку?

Заранее спасибо !! : D

...