本文主要介绍在tensorflow2.0中如何自定义loss,metric,以及如何动态检测loss 实现提前停止。
以手写数字数据集作为模型的输入,构造一个简单的模型用于说明。
(1)引入必要的库,加载数据集
import tensorflow as tf import tensorflow.keras.layers as layers import tensorflow.keras as keras (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data() # Preprocess the data (these are NumPy arrays) x_train = x_train.reshape(60000, 784).astype("float32") / 255 x_test = x_test.reshape(10000, 784).astype("float32") / 255 y_train = y_train.astype("float32") y_test = y_test.astype("float32")(2)定义一个简单的模型
def get_uncompiled_model(): inputs = keras.Input(shape=(784,), name="digits") x = layers.Dense(64, activation="relu", name="dense_1")(inputs) x = layers.Dense(64, activation="relu", name="dense_2")(x) outputs = layers.Dense(10, activation="softmax", name="predictions")(x) model = keras.Model(inputs=inputs, outputs=outputs) return model上述模型并未经过compile,在进行模型的compile之前,我们可以进行自定义loss,metric。
接下来先说说如何自定义损失函数loss。
基本步骤:先定义一个函数,在将函数传入loss, 然后进行compile。
(1) 简单loss
自定义loss 只需要 y_true 和 y_pred两个参数
主要代码如下:
# 定义均方误差损失函数 def custom_mean_squared_error(y_true,y_pred): return tf.math.reduce_mean(tf.square(y_true-y_pred)) model = get_uncompiled_model() # 将定义好的函数传入loss中, model.compile(optimizer=keras.optimizers.Adam(), metrics=['AUC'],loss=custom_mean_squared_error) # 标签one-hot y_train_one_hot = tf.one_hot(y_train.astype(int), depth=10) y_test_one_hot = tf.one_hot(y_test.astype(int), depth=10) # 开始训练 model.fit(x_train, y_train_one_hot,batch_size=64,epochs=1,validation_data=(x_test,y_test_one_hot))这样就实现自定义的loss了。
(2) 复杂loss
需要一个使用除 y_true 和 y_pred 之外的其他参数的损失函数,则可以将 tf.keras.losses.Loss 类子类化,并实现以下两个方法:
_init_(self):接受要在调用损失函数期间传递的参数call(self, y_true, y_pred):使用目标 (y_true) 和模型预测 (y_pred) 来计算模型的损失 # tf.keras.losses.Loss 类子类化 class CustomMSE(keras.losses.Loss): # 传入所需要的参数regularization_factor def __init__(self,regularization_factor=0.1,name='mse_1'): super().__init__(name=name) self.regularization_factor = regularization_factor # 使用目标 (y_true) 和模型预测 (y_pred) 来计算模型的损失 def call(self, y_true, y_pred): mse = tf.math.reduce_mean(tf.square(y_true-y_pred)) # 新增损失部分 reg = tf.math.reduce_mean(tf.square(0.5-y_pred)) # 返回最终的损失 return mse + reg*self.regularization_factor model = get_uncompiled_model() # ccompile时传入自定义loss model.compile(optimizer=keras.optimizers.Adam(), metrics=['AUC'],loss=CustomMSE()) y_train_one_hot = tf.one_hot(y_train.astype(int), depth=10) model.fit(x_train, y_train_one_hot, batch_size=64, epochs=1,validation_data=(x_test,y_test_one_hot))自定义metrics过将 tf.keras.metrics.Metric 类子类化来轻松创建自定义指标。需要实现 4 个方法:
__init__(self),在其中为指标创建状态变量。update_state(self, y_true, y_pred, sample_weight=None),使用目标 y_true 和模型预测 y_pred 更新状态变量。result(self),使用状态变量来计算最终结果。reset_states(self),用于重新初始化指标的状态。状态更新和结果计算分开保存(分别保存在 update_state() 和 result() 中),因为在某些情况下,结果计算的开销可能会非常大,并且只能定期执行。
class CategoricalTruePositives(keras.metrics.Metric): def __init__(self, name="categorical_true_positives", **kwargs): super(CategoricalTruePositives, self).__init__(name=name, **kwargs) self.true_positives = self.add_weight(name="ctp", initializer="zeros") def update_state(self, y_true, y_pred, sample_weight=None): # 在此处改代码 # 该指标可以计算有多少样本被正确分类为属于给定类: y_pred = tf.reshape(tf.argmax(y_pred, axis=1), shape=(-1, 1)) values = tf.cast(y_true, "int32") == tf.cast(y_pred, "int32") values = tf.cast(values, "float32") print('values',values) print('sample_weight',sample_weight) if sample_weight is not None: sample_weight = tf.cast(sample_weight, "float32") values = tf.multiply(values, sample_weight) self.true_positives.assign_add(tf.reduce_sum(values)) def result(self): return self.true_positives def reset_states(self): # The state of the metric will be reset at the start of each epoch. self.true_positives.assign(0.0) model = get_uncompiled_model() # 将自定义指标传入metrics model.compile( optimizer=keras.optimizers.RMSprop(learning_rate=1e-3), loss=keras.losses.SparseCategoricalCrossentropy(), metrics=[CategoricalTruePositives()], ) model.fit(x_train, y_train, batch_size=64, epochs=3)参考连接:https://tensorflow.google.cn/api_docs/python/tf/keras/callbacks/?hl=zh_cn
(1)EarlyStopping
model = get_compiled_model() callbacks = [ keras.callbacks.EarlyStopping( # Stop training when `val_loss` is no longer improving monitor="val_loss", # "no longer improving" being defined as "no better than 1e-2 less" min_delta=1e-2, # "no longer improving" being further defined as "for at least 2 epochs" patience=2, verbose=1, ) ] model.fit( x_train, y_train, epochs=20, batch_size=64, callbacks=callbacks, validation_split=0.2, )(2)LearningRateScheduler
训练过程动态调整学习了
# This function keeps the initial learning rate for the first ten epochs # and decreases it exponentially after that. def scheduler(epoch, lr): if epoch < 10: return lr else: return np.float(lr * tf.math.exp(-0.1)) callbacks = [tf.keras.callbacks.LearningRateScheduler(scheduler)] model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)]) model.compile(optimizer=tf.keras.optimizers.SGD(0.1), loss='mse') history = model.fit(np.arange(100).reshape(5, 20), np.zeros(5), epochs=6, callbacks=callbacks, verbose=1)(3)ReduceLROnPlateau
当指标停止改善时,降低学习率。
动态监督val_loss,当val_loss停止改善时(patience=5个epoch内),学习率将会按照这个规则:new_lr = lr * factor进行调整
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.001) model.fit(X_train, Y_train, callbacks=[reduce_lr])