blob: 2cc29c02be9493d93eede2373d091cca6e8eeeef [file] [log] [blame]
# Lint as: python3
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test keras Model training."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from absl import flags
import numpy as np
from pyiree.tf.support import tf_test_utils
from sklearn.preprocessing import PolynomialFeatures
import tensorflow as tf
FLAGS = flags.FLAGS
flags.DEFINE_string(
"optimizer_name", "sgd",
"optimizer name: sgd, rmsprop, nadam, adamax, adam, adagrad, adadelta")
_DEGREE = 3 # polynomial degree of input feature for regression test
_FEATURE_SIZE = _DEGREE + 1 # input feature size
_BATCH_SIZE = 8 # batch size has to be dynamic TODO(b/142948097)
_INPUT_DATA_SHAPE = [_BATCH_SIZE, _FEATURE_SIZE]
_OUTPUT_DATA_SHAPE = [_BATCH_SIZE, 1]
class ModelTrain(tf.Module):
"""A module for model training."""
@staticmethod
def CreateModule(input_dim=_FEATURE_SIZE, output_dim=1):
"""Creates a module for regression model training.
Args:
input_dim: input dimensionality
output_dim: output dimensionality
Returns:
model for linear regression
"""
tf_test_utils.set_random_seed()
# build a single layer model
inputs = tf.keras.layers.Input((input_dim))
outputs = tf.keras.layers.Dense(output_dim)(inputs)
model = tf.keras.Model(inputs, outputs)
return ModelTrain(model)
def __init__(self, model):
self.model = model
self.loss = tf.keras.losses.MeanSquaredError()
self.optimizer = tf.keras.optimizers.get(FLAGS.optimizer_name)
@tf.function(input_signature=[
tf.TensorSpec(_INPUT_DATA_SHAPE, tf.float32),
tf.TensorSpec(_OUTPUT_DATA_SHAPE, tf.float32)
])
def TrainStep(self, inputs, targets):
with tf.GradientTape() as tape:
predictions = self.model(inputs, training=True)
loss_value = self.loss(predictions, targets)
gradients = tape.gradient(loss_value, self.model.trainable_variables)
self.optimizer.apply_gradients(
zip(gradients, self.model.trainable_variables))
return loss_value
@tf_test_utils.compile_modules(
backends=["tf"], train_module=(ModelTrain.CreateModule, ["TrainStep"]))
class ModelTrainTest(tf_test_utils.SavedModelTestCase):
def generate_regression_data(self, size=8):
x = np.arange(size) - size // 2
y = 1.0 * x**3 + 1.0 * x**2 + 1.0 * x + np.random.randn(size) * size
return x, y
def test_model_train(self):
# generate input and output data for regression problem
inputs, targets = self.generate_regression_data()
# normalize data
inputs = inputs / max(inputs)
targets = targets / max(targets)
# generate plynomial features
inputs = np.expand_dims(inputs, axis=1)
polynomial = PolynomialFeatures(_DEGREE) # returns: [1, a, b, a^2, ab, b^2]
inputs = polynomial.fit_transform(inputs)
targets = np.expand_dims(targets, axis=1)
# run one iteration of training step
result = self.modules.train_module.all.TrainStep(inputs, targets)
result.print().assert_all_close()
if __name__ == "__main__":
if hasattr(tf, "enable_v2_behavior"):
tf.enable_v2_behavior()
tf.test.main()