由于误操作导致丢失的文章,还好markdown有备份。

文章目录 隐藏目录

一、优化手段和常用算法

1、使用阿里云镜像加速

-i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com

2、手写数字识别中初始化参数的问题

参看例子2-13,最开始我使用了标准的正态分布,收敛速度很慢。

w1 = tf.Variable(tf.random.truncated_normal([784, 256], stddev=1))
b1 = tf.Variable(tf.zeros([256]))
w2 = tf.Variable(tf.random.truncated_normal([256, 128], stddev=1))
b2 = tf.Variable(tf.zeros([128]))
w3 = tf.Variable(tf.random.truncated_normal([128, 10], stddev=1))
b3 = tf.Variable(tf.zeros([10]))

将正态分布的方差降为0.1时,收敛速度非常快

w1 = tf.Variable(tf.random.truncated_normal([784, 256], stddev=0.1))
b1 = tf.Variable(tf.zeros([256]))
w2 = tf.Variable(tf.random.truncated_normal([256, 128], stddev=0.1))
b2 = tf.Variable(tf.zeros([128]))
w3 = tf.Variable(tf.random.truncated_normal([128, 10], stddev=0.1))
b3 = tf.Variable(tf.zeros([10]))

可见,参数的初始化对神经网络的性能有着重要影响。

3、计算Top-k Accuracy

具体例子在2-9

# Top-k accuracy
def accuracy(output, target, topk=(1,)):
    maxk = max(topk)
    batch_size = target.shape[0]

    pred = tf.math.top_k(output, maxk).indices
    pred = tf.transpose(pred, perm=[1, 0])
    target_ = tf.broadcast_to(target, pred.shape)
    # [10, b]
    correct = tf.equal(pred, target_)

    res = []
    for k in topk:
        correct_k = tf.cast(tf.reshape(correct[:k], [-1]), dtype=tf.float32)
        correct_k = tf.reduce_sum(correct_k)
        acc = float(correct_k* (100.0 / batch_size) )
        res.append(acc)

    return res

4、TensorFlow在训练模型时突然假死

经过测试,这种问题一般是显存不够,更改batchsize一般可以解决该问题。或者更新驱动程序。

5、TensorBoard可视化

具体例子在2-14

current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
log_dir = '../logs/' + current_time
summary_writer = tf.summary.create_file_writer(log_dir)
# 可视化数据保存
with summary_writer.as_default():
    tf.summary.scalar('train-loss', float(loss), step=step)

6、使用Relu函数时将输入映射到0-1区间

在做CIFAR10分类时,若采用如下的数据预处理,将导致收敛速度过慢,或者不收敛

def preprocess(x, y):
    x = tf.cast(x, dtype=tf.float32) 
    x = tf.reshape(x, [-1, 32*32*3])
    y = tf.cast(y, dtype=tf.int32)
    y = tf.squeeze(y)
    y = tf.one_hot(y, depth=10)
    return x, y

将输入数据集映射到0-1区间是,模型性能较好

def preprocess(x, y):
    x = 2 * tf.cast(x, dtype=tf.float32) / 255 - 1
    x = tf.reshape(x, [-1, 32*32*3])
    y = tf.cast(y, dtype=tf.int32)
    y = tf.squeeze(y)
    y = tf.one_hot(y, depth=10)
    return x, y

具体例子参见3-4.

6、使用Dropout防止过拟合

layers.Dense(256, activation=tf.nn.relu),
layers.Dropout(0.5),
layers.Dense(128, activation=tf.nn.relu),
layers.Dropout(0.5),

7、使用L2正则化

layers.Dense(256, activation=tf.nn.relu, kernel_regularizer=keras.regularizers.l2(0.001))

8、Keras自定义损失函数

# 继承Loss类 重写call方法
class DistanceLoss(Loss):
    def call(self, y_true, y_pred):
        loss = tf.losses.categorical_crossentropy(y_true, y_pred, from_logits=True)
        loss = tf.reduce_mean(loss)
        loss = loss + (0.3*loss - 0.3)**2
        return loss

model.compile(optimizer=optimizers.Adam(lr=0.0001),
                  loss=DistanceLoss(),
                  metrics=['accuracy'])

具体例子参见4-1,一开始发现loss1的时候就会发生过拟合,所以想限制一下loss,结果并没有什么用。

9、使用ResNet网络

class ResNet(layers.Layer):
    def __init__(self, listOfLayer, shortcuts=None, activation=None):
        super(ResNet, self).__init__()
        self.model = Sequential(listOfLayer)
        self.activation = activation
        if shortcuts is None:
            self.shortcut = lambda x: x
        else:
            self.shortcut = Sequential(shortcuts)

    def call(self, inputs, **kwargs):
        residual = self.shortcut(inputs)
        out = self.model(inputs)
        if self.activation is None:
            return layers.add([residual, out])
        else:
            return self.activation(residual + out)

def resNet(filter_num, strides):
    if strides == 1:
        return ResNet(listOfLayer=[
            layers.Conv2D(filter_num, (3, 3), strides=1, padding="same"),
            layers.BatchNormalization(),
            layers.Activation("relu"),
            layers.Conv2D(filter_num, (3, 3), strides=1, padding="same"),
            layers.BatchNormalization(),
            layers.Activation("relu")
        ], activation=tf.nn.relu)
    else:
        return ResNet(listOfLayer=[
            layers.Conv2D(filter_num, (3, 3), strides=strides, padding="same"),
            layers.BatchNormalization(),
            layers.Activation("relu"),
            layers.Conv2D(filter_num, (3, 3), strides=1, padding="same"),
            layers.BatchNormalization(),
            layers.Activation("relu")
        ], shortcuts=[

            layers.Conv2D(filter_num, (1, 1), strides=strides, padding="same", kernel_regularizer=regularizers.l2(5e-5),
                          use_bias=False, kernel_initializer='glorot_normal')
        ], activation=tf.nn.relu)

具体例子参见4-2.

10、计算两个正态分布的KL-Divergence

\begin{alignedat}{1}
    KL(p,q) &= -\int p(x)logq(x)dx + \int p(x)logp(x)dx \\
    &= \frac{1}{2}log(2\pi\sigma_{2}^2) + \frac{\sigma_{1}^2+(\mu_{1}-\mu_{2})^2}{2\sigma_2^2} - \frac{1}{2}(1+log2\pi\sigma_1^2) \\
    &= log\frac{\sigma_2}{\sigma_1} + \frac{\sigma_{1}^2+(\mu_{1}-\mu_{2})^2}{2\sigma_2^2} - \frac{1}{2}
\end{alignedat}

参考链接:https://stats.stackexchange.com/questions/7440/kl-divergence-between-two-univariate-gaussians

11、Transposed卷积输出计算公式

out = s(i - 1) + k - 2p

参考文章:https://arxiv.org/pdf/1603.07285v1.pdf

12、使用GAN训练时的技巧

  • 使用leak_relu
  • 使用tanh作为激活函数
  • 使用WGAN增强参数鲁棒性

13、加载Keras内置模型

# 加载Keras已有网络
vgg19 = keras.applications.VGG19(weights="imagenet", include_top=False, pooling="max")

具体例子参看6-1

14、参考链接

二、TensorFlow基础操作

1、Tensor数据类型

import tensorflow as tf
import numpy as np

# 输入TensorFlow版本号
print(tf.version.VERSION)
# 创建Tensor int32
print(tf.constant(1))
# 创建Tensor float32
print(tf.constant(1.1))
# 指定类型创建Tensor float64
print(tf.constant(2.2, dtype=tf.float64))
# 创建Tensor bool
print(tf.constant([True, False]))
# 创建Tensor string 不推荐使用string类型的Tensor
print(tf.constant('hello'))

a = tf.range(5)
# 声明Tensor对象为变量
b = tf.Variable(a)
c = tf.Variable(10)
# 是否监听梯度变化
print(b.trainable)
print(c.trainable)

2、Tensor的创建

import tensorflow as tf
import numpy as np

# 从numpy创建Tensor
print(tf.convert_to_tensor(np.ones([2, 3])))
print(tf.convert_to_tensor(np.ones([2, 3]), dtype=tf.float32))

# 从List创建Tensor
print(tf.constant([1, 2, 3]))
print(tf.convert_to_tensor([1, 2, 3]))
print(tf.convert_to_tensor([[1], [2], [3]]))

# 创建标量为0的Tensor
print(tf.zeros([]))
# 创建shape为1的标量为0的Tensor
print(tf.zeros([1]))
# 创建2行2列的全0的Tensor
print(tf.zeros([2, 2]))
a = tf.zeros([2, 3, 3])
b = np.zeros([2, 3, 3])
# 创建shape相同的全0的Tensor
print(tf.zeros_like(a))
print(tf.zeros_like(b))

# 创建全1的Tensor
print(tf.ones([2, 2, 3]))
print(tf.ones_like(a))

# 创建全是指定值的Tensor
print(tf.fill([2, 3, 3], 6))
print(tf.fill(a.shape, 6))

# 随机初始化Tensor
print(tf.random.normal([2, 2], mean=0, stddev=1))
# 截断正态分布 避免梯度消失
# 取值范围为 [ mean - 2 * stddev, mean + 2 * stddev ]
print(tf.random.truncated_normal([2, 2], mean=0, stddev=1))
print(tf.random.uniform([2, 2], maxval=10, minval=0, dtype=tf.int32))

# 随机打散
idx = tf.range(10)
idx = tf.random.shuffle(idx)
print(idx)
a = tf.random.normal([10, 784])
b = tf.random.uniform([10], maxval=10, minval=0, dtype=tf.int32)
# 这里a和b满足原来的对应关系
print(tf.gather(a, idx))
print(tf.gather(b, idx))

3、Tensor的索引与切片

import tensorflow as tf
import numpy as np

# 基本索引
a = tf.ones([1, 5, 5, 4])
print(a[0][0])
print(a[0][0][2])
print(a[0][0][2][2])

# Numpy风格索引
print(a[0, 0])
print(a[0, 0, 2])
print(a[0, 0, 2, 2])

# start:end 索引和切片
a = tf.range(10)
# 切片操作返回向量,索引操作返回数值
print(a[-1])
# 取全部数据
print(a[:])
# 从倒数第一个元素开始切片[9]
print(a[-1:])
# 从倒数第二个元素开始切片[8, 9]
print(a[-2:])
# 一直取到最后一个元素,左闭右开区间
print(a[:-1])

# start:end:step 索引和切片
a = tf.range(10)
# 每隔两个取[0 2 4 6 8]
print(a[::2])
# 倒序每隔两个取[9 7 5 3 1]
print(a[::-2])

# 使用...自动推导
a = tf.ones([1, 5, 5, 4])
print(a[0, :, :, :])
print(a[0, ...])
print(a[..., 0])

4、Tensor的高级索引和切片

import tensorflow as tf
import numpy as np

a = tf.random.normal([8, 35, 8])
# 取第1维度2、3索引
print(tf.gather(a, axis=0, indices=[2, 3]).shape)
# 取第1维度1、4、5索引
print(tf.gather(a, axis=0, indices=[1, 4, 5]).shape)
# 取第2维度1和32索引
print(tf.gather(a, axis=1, indices=[1, 32]).shape)
# a[0]
print(tf.gather_nd(a, [0]).shape)
# a[0,1,2]
print(tf.gather_nd(a, [0, 1, 2]).shape)
# [a[1], a[2], a[5])]
print(tf.gather_nd(a, [[1], [2], [5]]).shape)
# [a[1, 2], a[2, 1], a[5, 3])]
print(tf.gather_nd(a, [[1, 2], [2, 1], [5, 3]]).shape)
# a[0]
print(tf.boolean_mask(a, mask=[True, False]))
# a[:,:,0]
print(tf.boolean_mask(a, mask=[True, False], axis=3))

5、Tensor的维度变换

import tensorflow as tf
import numpy as np

a = tf.random.normal([100, 28, 28, 3])
# 改变维度, -1代表自动计算
print(tf.reshape(a, [100, -1, 3]).shape)

a = tf.range(16)
a = tf.reshape(a, [4, 4])
# 第0维度放原来的1维度,1维度放原来的0维度
a = tf.transpose(a, perm=[1, 0])
print(a)

# Expand dim
a = tf.random.normal([2, 28, 28, 3])
print(tf.expand_dims(a, axis=0).shape)
print(tf.expand_dims(a, axis=-1).shape)
print(tf.expand_dims(a, axis=-2).shape)

# Squeeze dim
a = tf.random.normal([2, 1, 1, 3])
print(tf.squeeze(a).shape)

6、TensorBroadcasting

import tensorflow as tf
import numpy as np

# Broadcasting
a = tf.ones([4, 1])
b = tf.ones([1, 4])
print(a+b)
# 不会占用存储空间 运行时计算
print(tf.broadcast_to(a, [4, 4]))

7、Tensor的合并与分割

import tensorflow as tf

# 合并数据集合
a = tf.ones([3, 35, 8])
b = tf.ones([6, 35, 8])

# 按照第1维度合并, 不会产生新的维度 (9, 35, 8)
print(tf.concat([a, b], axis=0).shape)

# 在最前面创建一个新的维度合并 (2, 6, 35, 8)
a = tf.ones([6, 35, 8])
b = tf.ones([6, 35, 8])
print(tf.stack([a, b], axis=0).shape)
# 按照第一维度分开 分成6个(35, 8)
print(tf.unstack(a, axis=0))

# 按照第一维度分开 分成(1, 35, 8)、(2, 35, 8)、(3, 35, 8)
print(tf.split(a, axis=0, num_or_size_splits=[1, 2, 3]))

8、Tensor的数据统计

import tensorflow as tf

# 2范数, 求所有元素的均方
a = tf.ones([2, 2])
print(tf.norm(a))

# 指定维度2范数
print(tf.norm(a, axis=1))
# 指定维度1范数
print(tf.norm(a, axis=1, ord=1))

# reduce_min/max/mean argmax/argmin
a = tf.random.normal([4, 10])
print(tf.reduce_max(a, axis=0))
print(tf.argmax(a, axis=0))

# equal
a = tf.constant([1, 1, 3, 2, 5])
b = tf.range(5)
print(tf.equal(a, b))

# 计算Accuracy
a = tf.constant([[0.1, 0.2, 0.7], [0.1, 0.5, 0.4]])
# [2, 1]
pred = tf.cast(tf.argmax(a, axis=1), dtype=tf.int32)
print(pred)
y = tf.constant([2, 0])
correct = tf.equal(pred, y)
print(tf.reduce_mean(tf.cast(correct,dtype=tf.float32)))

# 去除重复元素
a = tf.constant([4, 2, 2, 4, 3])
ua, idx = tf.unique(a)
print(ua)
print(tf.gather(ua, axis=0, indices=idx))

9、Tensor排序

import tensorflow as tf
a = tf.random.shuffle(tf.range(5))
# 降序排列
print(tf.sort(a, direction="DESCENDING"))
# 降序排列
print(tf.argsort(a, direction="DESCENDING"))

# Top-k accuracy
def accuracy(output, target, topk=(1,)):
    maxk = max(topk)
    batch_size = target.shape[0]

    pred = tf.math.top_k(output, maxk).indices
    pred = tf.transpose(pred, perm=[1, 0])
    target_ = tf.broadcast_to(target, pred.shape)
    # [10, b]
    correct = tf.equal(pred, target_)

    res = []
    for k in topk:
        correct_k = tf.cast(tf.reshape(correct[:k], [-1]), dtype=tf.float32)
        correct_k = tf.reduce_sum(correct_k)
        acc = float(correct_k* (100.0 / batch_size) )
        res.append(acc)

    return res

10、Tensor填充与复制

import tensorflow as tf

a = tf.range(9)
a = tf.reshape(a, [3, 3])

# 第一维度前面填充1次0后面填充2次0
# [[0 0 0]
#  [0 1 2]
#  [3 4 5]
#  [6 7 8]
#  [0 0 0]]
a = tf.pad(a, [[1, 1], [0, 0]])
print(a)
# 第一维度不变 第二维度复制一次
# [[0 0 0 0 0 0]
#  [0 1 2 0 1 2]
#  [3 4 5 3 4 5]
#  [6 7 8 6 7 8]
#  [0 0 0 0 0 0]]
a = tf.tile(a, [1, 2])
print(a)

11、Tensor限幅

import tensorflow as tf

a = tf.range(9)
# 最小值限制在2 [2 2 2 3 4 5 6 7 8]
print(tf.maximum(a, 2))

# 最大值限制在2 [0 1 2 2 2 2 2 2 2]
print(tf.minimum(a, 2))
# 最小值限制在2和最大值限制在8 [2 2 2 3 4 5 6 7 8]
print(tf.clip_by_value(a, 2, 8))

a = tf.random.normal([2, 2], mean=10)
# 将2范数限制到15进行放缩 不改变方向
print(tf.clip_by_norm(a, 15))
# 全部norm求和放缩到25
# new_grads,total_norm = tf.clip_by_global_norm(grads, 25)

12、GradientDescending求函数极值

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

def f(x):
    return (x[0] ** 2 + x[1] - 11)**2 + (x[0] + x[1]**2 - 7)**2

x = np.arange(-6, 6, 0.1)
y = np.arange(-6, 6, 0.1)
X, Y = np.meshgrid(x, y)
Z = f([X, Y])

fig = plt.figure("")
ax = fig.gca(projection="3d")
ax.plot_surface(X, Y, Z)
ax.view_init(60, -30)
plt.show()

x = tf.constant([-4.0, 0.0])

for step in range(200):
    with tf.GradientTape() as tape:
        tape.watch([x])
        y = f(x)
    grads = tape.gradient(y, [x])[0]
    x -= 0.01 * grads
    if step % 20 == 0:
        print("Step:", step, " X:", x.numpy(), "Y:", y.numpy())

13、Mnist手写数字识别

import tensorflow as tf
from tensorflow.keras import datasets
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

(x, y), (x_test, y_test) = datasets.mnist.load_data()

x = tf.convert_to_tensor(x, dtype=tf.float32)/255
y = tf.convert_to_tensor(y, dtype=tf.int32)

x_test = tf.convert_to_tensor(x_test, dtype=tf.float32)/255
y_test = tf.convert_to_tensor(y_test, dtype=tf.int32)
# print(tf.reduce_max(x))

train_db = tf.data.Dataset.from_tensor_slices((x, y)).batch(128)
train_db.shuffle(x.shape[0])
train_iter = iter(train_db)
# sample = next(train_iter)
# print(sample)

test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(128)

w1 = tf.Variable(tf.random.truncated_normal([784, 256], stddev=0.1))
b1 = tf.Variable(tf.zeros([256]))
w2 = tf.Variable(tf.random.truncated_normal([256, 128], stddev=0.1))
b2 = tf.Variable(tf.zeros([128]))
w3 = tf.Variable(tf.random.truncated_normal([128, 10], stddev=0.1))
b3 = tf.Variable(tf.zeros([10]))
lr = 1e-2
for epoch in range(30):
    for step, (x, y) in enumerate(train_db):
        x = tf.reshape(x, [-1, 28*28])
        y = tf.one_hot(y, depth=10)
        with tf.GradientTape() as tape:
            h1 = x@w1 + b1
            h1 = tf.nn.relu(h1)
            h2 = h1@w2 + b2
            h2 = tf.nn.relu(h2)
            out = h2@w3 + b3
            out = tf.nn.softmax(out)
            loss = tf.losses.categorical_crossentropy(out, y, from_logits=True)

            # loss = tf.square(y - out)
            # loss = tf.reduce_mean(loss)

        grads = tape.gradient(loss,[w1, b1, w2, b2, w3, b3])
        # print(grads)
        # w1.assign_sub(lr * grads[0])
        w1 = tf.Variable(w1 - lr * grads[0])
        b1 = tf.Variable(b1 - lr * grads[1])
        w2 = tf.Variable(w2 - lr * grads[2])
        b2 = tf.Variable(b2 - lr * grads[3])
        w3 = tf.Variable(w3 - lr * grads[4])
        b3 = tf.Variable(b3 - lr * grads[5])

        if step % 100 == 0:
            loss = tf.reduce_mean(loss)
            print("Epoch", epoch, "Step:", step, "Loss:", float(loss))

    total_correct = 0
    total_num = 0
    for step, (x, y) in enumerate(test_db):
        x = tf.reshape(x, [-1, 28 * 28])

        h1 = x @ w1 + b1
        h1 = tf.nn.relu(h1)
        h2 = h1 @ w2 + b2
        h2 = tf.nn.relu(h2)
        out = h2 @ w3 + b3
        prob = tf.nn.softmax(out)
        # [b, 10]
        pred = tf.argmax(prob, axis=1)
        pred = tf.cast(pred, dtype=tf.int32)
        correct = tf.cast(tf.equal(pred, y),dtype=tf.int32)
        correct = tf.reduce_sum(correct)
        total_correct += int(correct)
        total_num += x.shape[0]

    print("Acc:", total_correct/total_num)

14、TensorBoard可视化

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics

def plot_to_image(figure):
    """Converts the matplotlib plot specified by 'figure' to a PNG image and
    returns it. The supplied figure is closed and inaccessible after this call."""
    # Save the plot to a PNG in memory.
    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    # Closing the figure prevents it from being displayed directly inside
    # the notebook.
    plt.close(figure)
    buf.seek(0)
    # Convert PNG buffer to TF image
    image = tf.image.decode_png(buf.getvalue(), channels=4)
    # Add the batch dimension
    image = tf.expand_dims(image, 0)
    return image

def image_grid(images):
    """Return a 5x5 grid of the MNIST images as a matplotlib figure."""
    # Create a figure to contain the plot.
    figure = plt.figure(figsize=(10, 10))
    for i in range(25):
        # Start next subplot.
        plt.subplot(5, 5, i + 1, title='name')
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(images[i], cmap=plt.cm.binary)

    return figure

def preprocess(x, y):
    x = tf.cast(x, dtype=tf.float32) / 255
    y = tf.cast(y, dtype=tf.int32)
    return x, y

(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.map(preprocess).shuffle(10000).batch(128)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(128)

model = Sequential([
    layers.Dense(256, activation="relu"),
    layers.Dense(128, activation="relu"),
    layers.Dense(64, activation="relu"),
    layers.Dense(32, activation="relu"),
    layers.Dense(16, activation="relu"),
    layers.Dense(10, activation="relu")
])

model.build(input_shape=[None, 28*28])
model.summary()
optimizer = optimizers.Adam(learning_rate=0.001)

current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
log_dir = '../logs/' + current_time
summary_writer = tf.summary.create_file_writer(log_dir)

for epoch in range(10):
    for step, (x, y) in enumerate(db_train):
        x = tf.reshape(x, [-1, 28*28])
        y = tf.one_hot(y, depth=10)
        with tf.GradientTape() as tape:
            out = model(x)
            # loss = tf.reduce_mean(tf.losses.MSE(y, out))
            loss = tf.reduce_mean(tf.losses.categorical_crossentropy(y, out, from_logits=True))
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        if step % 100 == 0:
            print("Epoch", epoch, "Step:", step, "Loss:", float(loss))
            with summary_writer.as_default():
                tf.summary.scalar('train-loss', float(loss), step=step)

    total_correct = 0
    total_num = 0
    for (x, y) in db_test:
        x = tf.reshape(x, [-1, 28*28])
        y = tf.cast(y, tf.int32)
        out = model(x)
        prob = tf.nn.softmax(out)
        pred = tf.argmax(prob, axis=1)
        pred = tf.cast(pred, dtype=tf.int32)
        correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
        correct = tf.reduce_sum(correct)
        total_correct += int(correct)
        total_num += x.shape[0]
    print("Acc:", total_correct / total_num)
    with summary_writer.as_default():
        tf.summary.scalar('test-acc', float(total_correct / total_num), step=epoch)

三、Keras高层API

1、Keras常用API

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics

def preprocess(x, y):
    x = tf.cast(x, dtype=tf.float32) / 255
    x = tf.reshape(x, [28 * 28])
    y = tf.cast(y, dtype=tf.int32)
    y = tf.one_hot(y, depth=10)
    return x, y

(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.map(preprocess).shuffle(10000).batch(128)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.map(preprocess).batch(128)

model = Sequential([
    layers.Dense(256, activation="relu"),
    layers.Dense(128, activation="relu"),
    layers.Dense(64, activation="relu"),
    layers.Dense(32, activation="relu"),
    layers.Dense(16, activation="relu"),
    layers.Dense(10, activation="relu")
])

model.build(input_shape=[None, 28*28])

model.compile(optimizer=optimizers.Adam(lr=0.01),
              loss=tf.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
# 每隔两次验证测试数据集合 共训练5次
model.fit(db_train, epochs=5, validation_data=db_test, validation_freq=2)

2、KerasMetrics计算均值和准确度

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics

def preprocess(x, y):
    x = tf.cast(x, dtype=tf.float32) / 255
    y = tf.cast(y, dtype=tf.int32)
    return x, y

(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.map(preprocess).shuffle(10000).batch(128)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(128)

model = Sequential([
    layers.Dense(256, activation="relu"),
    layers.Dense(128, activation="relu"),
    layers.Dense(64, activation="relu"),
    layers.Dense(32, activation="relu"),
    layers.Dense(16, activation="relu"),
    layers.Dense(10, activation="relu")
])

model.build(input_shape=[None, 28*28])
model.summary()
optimizer = optimizers.Adam(learning_rate=0.001)

loss_meter = metrics.Mean()
acc_meter = metrics.Accuracy()

for epoch in range(10):
    for step, (x, y) in enumerate(db_train):
        x = tf.reshape(x, [-1, 28*28])
        y = tf.one_hot(y, depth=10)
        with tf.GradientTape() as tape:
            out = model(x)
            # loss = tf.reduce_mean(tf.losses.MSE(y, out))
            loss = tf.reduce_mean(tf.losses.categorical_crossentropy(y, out, from_logits=True))
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        loss_meter.update_state(loss)
        if step % 100 == 0:
            print("Epoch", epoch, "Step:", step, "Loss:", loss_meter.result().numpy())
            loss_meter.reset_states()

    for (x, y) in db_test:
        x = tf.reshape(x, [-1, 28*28])
        y = tf.cast(y, tf.int32)
        out = model(x)
        prob = tf.nn.softmax(out)
        pred = tf.argmax(prob, axis=1)
        pred = tf.cast(pred, dtype=tf.int32)
        acc_meter.update_state(pred, y)
    print("Acc:", acc_meter.result().numpy())
    acc_meter.reset_states()

3、自定义模型和层

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics

class MyDense(layers.Layer):
    def __init__(self, in_dim, out_dim):
        super(MyDense, self).__init__()
        self.kernel = self.add_weight("w", [in_dim, out_dim])
        self.bias = self.add_weight("b", [out_dim])

    def call(self, inputs, training=None):
        out = inputs @ self.kernel + self.bias
        return tf.nn.relu(out)

class MyModel(keras.Model):
    def __init__(self):
        super(MyModel, self).__init__()

        self.fc1 = MyDense(28 * 28, 256)
        self.fc2 = MyDense(256, 128)
        self.fc3 = MyDense(128, 64)
        self.fc4 = MyDense(64, 32)
        self.fc5 = MyDense(32, 10)

    def call(self, inputs, training=None, mask=None):
        x = self.fc1(inputs)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        x = self.fc5(x)
        return x

def preprocess(x, y):
    x = tf.cast(x, dtype=tf.float32) / 255
    x = tf.reshape(x, [28 * 28])
    y = tf.cast(y, dtype=tf.int32)
    y = tf.one_hot(y, depth=10)
    return x, y

(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.map(preprocess).shuffle(10000).batch(128)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.map(preprocess).batch(128)

model = MyModel()

# model.build(input_shape=[None, 28*28])

model.compile(optimizer=optimizers.Adam(lr=0.01),
              loss=tf.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
model.fit(db_train, epochs=5, validation_data=db_test, validation_freq=2)

4、CIFAR10分类

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics

class MyDense(layers.Layer):
    def __init__(self, in_dim, out_dim):
        super(MyDense, self).__init__()
        self.kernel = self.add_weight("w", [in_dim, out_dim])
        self.bias = self.add_weight("b", [out_dim])

    def call(self, inputs, training=None):
        return tf.nn.relu(inputs @ self.kernel )

class MyNetwork(keras.Model):
    def __init__(self):
        super(MyNetwork, self).__init__()
        self.fc1 = MyDense(32 * 32 * 3, 256)
        self.fc2 = MyDense(256, 128)
        self.fc3 = MyDense(128, 64)
        self.fc4 = MyDense(64, 32)
        self.fc5 = MyDense(32, 10)

    def call(self, inputs, training=None, mask=None):
        x = tf.reshape(inputs, [-1, 32 * 32 * 3])
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        x = self.fc5(x)
        return x

def preprocess(x, y):
    x = 2 * tf.cast(x, dtype=tf.float32) / 255 - 1
    x = tf.reshape(x, [-1, 32*32*3])
    y = tf.cast(y, dtype=tf.int32)
    y = tf.squeeze(y)
    y = tf.one_hot(y, depth=10)
    return x, y

(x_train, y_train), (x_test, y_test) = datasets.cifar10.load_data()
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.map(preprocess).shuffle(10000).batch(128)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.map(preprocess).batch(128)

model = MyNetwork()
model.compile(optimizer=optimizers.Adam(lr=0.001),
              loss=tf.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
model.fit(db_train, epochs=15, validation_data=db_test, validation_freq=2)

四、卷积神经网络

1、自定义VGG13网络CIFAR100分类

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
from tensorflow_core.python.keras.losses import Loss
import os

tf.random.set_seed(2345)

vgg13 = [

    layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.BatchNormalization(
        axis=-1,
        center=True,
        scale=True,
        trainable=True
    ),
    layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.BatchNormalization(),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding="same"),

layers.BatchNormalization(),
    layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.BatchNormalization(),
    layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding="same"),
    layers.BatchNormalization(),
    layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.BatchNormalization(),
    layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding="same"),
    layers.BatchNormalization(),
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.BatchNormalization(),
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding="same"),
    layers.BatchNormalization(),
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu, kernel_regularizer=keras.regularizers.l2(0.001)),
    layers.BatchNormalization(),
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu, kernel_regularizer=keras.regularizers.l2(0.001)),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding="same"),

    layers.BatchNormalization(),
    layers.Flatten(),
    layers.Dense(256, activation=tf.nn.relu),
    layers.Dropout(0.3),
    layers.Dense(128, activation=tf.nn.relu),
    layers.Dropout(0.5),
    layers.Dense(100, activation=None)

]

def preprocess(x, y):
    x = 2 * tf.cast(x, dtype=tf.float32) / 255 - 1
    y = tf.cast(y, dtype=tf.int32)
    y = tf.squeeze(y)
    y = tf.one_hot(y, depth=100)
    return x, y

(x_train, y_train), (x_test, y_test) = datasets.cifar100.load_data()
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.map(preprocess).shuffle(10000).batch(128)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.map(preprocess).batch(128)
# 自定义loss函数
class DistanceLoss(Loss):
    def call(self, y_true, y_pred):
        loss = tf.losses.categorical_crossentropy(y_true, y_pred, from_logits=True)
        loss = tf.reduce_mean(loss)
        loss = loss + (0.3*loss - 0.3)**2
        return loss

model = Sequential(vgg13)
model.compile(optimizer=optimizers.Adam(lr=0.0001),
              loss=tf.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
model.fit(db_train, epochs=105, validation_data=db_test, validation_freq=2)

2、使用ResNet网络对CIFAR100分类

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
from tensorflow_core.python.keras import regularizers
from tensorflow_core.python.keras.losses import Loss

# 该类的层会自动与最后一层连接
class ResNet(layers.Layer):
    def __init__(self, listOfLayer, shortcuts=None, activation=None):
        super(ResNet, self).__init__()
        self.model = Sequential(listOfLayer)
        self.activation = activation
        if shortcuts is None:
            self.shortcut = lambda x: x
        else:
            self.shortcut = Sequential(shortcuts)

    def call(self, inputs, **kwargs):
        residual = self.shortcut(inputs)
        out = self.model(inputs)
        if self.activation is None:
            return layers.add([residual, out])
        else:
            return self.activation(residual + out)

def resNet(filter_num, strides):
    if strides == 1:
        return ResNet(listOfLayer=[
            layers.Conv2D(filter_num, (3, 3), strides=1, padding="same"),
            layers.BatchNormalization(),
            layers.Activation("relu"),
            layers.Conv2D(filter_num, (3, 3), strides=1, padding="same"),
            layers.BatchNormalization(),
            layers.Activation("relu")
        ], activation=tf.nn.relu)
    else:
        return ResNet(listOfLayer=[
            layers.Conv2D(filter_num, (3, 3), strides=strides, padding="same"),
            layers.BatchNormalization(),
            layers.Activation("relu"),
            layers.Conv2D(filter_num, (3, 3), strides=1, padding="same"),
            layers.BatchNormalization(),
            layers.Activation("relu")
        ], shortcuts=[

            layers.Conv2D(filter_num, (1, 1), strides=strides, padding="same", kernel_regularizer=regularizers.l2(5e-5),
                          use_bias=False, kernel_initializer='glorot_normal')
        ], activation=tf.nn.relu)

tf.random.set_seed(2345)

network = [layers.Conv2D(64, (3, 3), strides=1, padding="same"), layers.BatchNormalization(), layers.Activation("relu"),
           layers.MaxPool2D(pool_size=[2, 2], strides=1, padding="same"), resNet(64, 1), resNet(64, 1), resNet(128, 2),
           resNet(128, 1), layers.GlobalAveragePooling2D(), layers.Dense(100, activation=None)]

network.append(resNet(256, 2))
network.append(resNet(256, 1))

network.append(resNet(512, 2))

network.append(resNet(512, 1))

img_mean = tf.constant([0.4914, 0.4822, 0.4465])
img_std = tf.constant([0.2023, 0.1994, 0.2010])

def normalize(x, mean=img_mean, std=img_std):
    # x shape: [224, 224, 3]
    # mean:shape为1;这里用到了广播机制。我们安装好右边对齐的原则,可以得到如下;
    # mean : [1, 1, 3], std: [3]        先插入1
    # mean : [224, 224, 3], std: [3]    再变为224
    x = (x - mean) / std
    return x

# 数据预处理,仅仅是类型的转换。    [-1~1]
def preprocess(x, y):
    x = tf.image.random_flip_left_right(x)
    # x: [0,255]=> 0~1 或者-0.5~0.5   其次:normalizaion
    x = tf.cast(x, dtype=tf.float32) / 255.
    # 0~1 => D(0,1) 调用函数;
    x = normalize(x)
    y = tf.squeeze(y)
    y = tf.cast(y, dtype=tf.int32)
    y = tf.one_hot(y, depth=100)
    return x, y

# def preprocess(x, y):
#     x = 2 * tf.cast(x, dtype=tf.float32) / 255 - 1
#     y = tf.cast(y, dtype=tf.int32)
#     y = tf.squeeze(y)
#     y = tf.one_hot(y, depth=100)
#     return x, y

(x_train, y_train), (x_test, y_test) = datasets.cifar100.load_data()
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.map(preprocess).shuffle(10000).batch(128)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.map(preprocess).batch(128)

class DistanceLoss(Loss):
    def call(self, y_true, y_pred):
        loss = tf.losses.categorical_crossentropy(y_true, y_pred, from_logits=True)
        loss = tf.reduce_mean(loss)
        loss = loss + (0.3 * loss - 0.3) ** 2
        return loss

model = Sequential(network)
model.build(input_shape=[None, 32, 32, 3])
model.summary()

model.compile(optimizer=optimizers.Adam(lr=1e-4),
              loss=tf.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
model.fit(db_train, epochs=1000, validation_data=db_test, validation_freq=2)

五、循环神经网络

1、循环神经网络对IMDB评论进行情感分类

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
tf.compat.v1.disable_eager_execution()
tf.random.set_seed(2345)

(x_train, y_train), (x_test, y_test) = datasets.imdb.load_data(num_words=10000)
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=80)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=80)
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(10000).batch(128, drop_remainder=True)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(128, drop_remainder=True)

class MyRNN(keras.Model):
    def __init__(self, units, batchSize):
        super(MyRNN, self).__init__()
        self.state0 = [tf.zeros([batchSize, units])]
        self.state1 = [tf.zeros([batchSize, units])]
        # [b 80] => [b 80 100]
        self.embedding = layers.Embedding(10000, 100, input_length=80)
        self.rnn_cell0 = layers.SimpleRNNCell(units, dropout=0.2)
        self.rnn_cell1 = layers.SimpleRNNCell(units, dropout=0.2)
        self.fc = layers.Dense(1)

    def call(self, inputs, training=None, mask=None):
        x = self.embedding(inputs)
        state0 = self.state0
        state1 = self.state1
        for word in tf.unstack(x, axis=1):
            out, state0 = self.rnn_cell0(word, state0, training)
            out, state1 = self.rnn_cell1(out, state1, training)
        x = self.fc(out)

        return tf.nn.sigmoid(x)

model = MyRNN(128, 128)
model.compile(optimizer=optimizers.Adam(lr=0.0001),
              loss=tf.losses.BinaryCrossentropy(),
              metrics=['accuracy'])
model.fit(db_train, epochs=15, validation_data=db_test, validation_freq=2)

2、使用Keras内置的RNN进行IMDB情感分类

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
tf.compat.v1.disable_eager_execution()
tf.random.set_seed(2345)

(x_train, y_train), (x_test, y_test) = datasets.imdb.load_data(num_words=10000)
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=80)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=80)
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(10000).batch(128, drop_remainder=True)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(128, drop_remainder=True)

class MyRNN(keras.Model):
    def __init__(self, units, batchSize):
        super(MyRNN, self).__init__()
        self.state0 = [tf.zeros([batchSize, units])]
        self.state1 = [tf.zeros([batchSize, units])]
        # [b 80] => [b 80 100]
        self.embedding = layers.Embedding(10000, 100, input_length=80)
        self.rnn = keras.Sequential([
            layers.SimpleRNN(units, dropout=0.5, return_sequences=True, unroll=True),
            layers.SimpleRNN(units, dropout=0.5, unroll=True)
        ])
        self.fc = layers.Dense(1)

    def call(self, inputs, training=None, mask=None):
        x = self.embedding(inputs)
        x = self.rnn(x)
        x = self.fc(x)

        return tf.nn.sigmoid(x)

model = MyRNN(128, 128)
model.compile(optimizer=optimizers.Adam(lr=0.0001),
              loss=tf.losses.BinaryCrossentropy(),
              metrics=['accuracy'])
model.fit(db_train, epochs=15, validation_data=db_test, validation_freq=2)

3、使用LSTM对IMDB评论进行情感分类

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
tf.compat.v1.disable_eager_execution()
tf.random.set_seed(2345)

(x_train, y_train), (x_test, y_test) = datasets.imdb.load_data(num_words=10000)
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=80)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=80)
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(10000).batch(128, drop_remainder=True)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(128, drop_remainder=True)

class MyRNN(keras.Model):
    def __init__(self, units, batchSize):
        super(MyRNN, self).__init__()
        self.state0 = [tf.zeros([batchSize, units]), tf.zeros([batchSize, units])]
        self.state1 = [tf.zeros([batchSize, units]), tf.zeros([batchSize, units])]
        # [b 80] => [b 80 100]
        self.embedding = layers.Embedding(10000, 100, input_length=80)
        self.rnn_cell0 = layers.LSTMCell(units, dropout=0.2)
        self.rnn_cell1 = layers.LSTMCell(units, dropout=0.2)
        self.fc = layers.Dense(1)

    def call(self, inputs, training=None, mask=None):
        x = self.embedding(inputs)
        state0 = self.state0
        state1 = self.state1
        for word in tf.unstack(x, axis=1):
            out, state0 = self.rnn_cell0(word, state0, training)
            out, state1 = self.rnn_cell1(out, state1, training)
        x = self.fc(out)

        return tf.nn.sigmoid(x)

model = MyRNN(128, 128)
model.compile(optimizer=optimizers.Adam(lr=0.0001),
              loss=tf.losses.BinaryCrossentropy(),
              metrics=['accuracy'])
model.fit(db_train, epochs=15, validation_data=db_test, validation_freq=2)

4、使用Keras内置LSTM进行IMDB情感分类

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
tf.compat.v1.disable_eager_execution()
tf.random.set_seed(2345)

(x_train, y_train), (x_test, y_test) = datasets.imdb.load_data(num_words=10000)
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=80)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=80)
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.shuffle(10000).batch(128, drop_remainder=True)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.batch(128, drop_remainder=True)

class MyRNN(keras.Model):
    def __init__(self, units, batchSize):
        super(MyRNN, self).__init__()
        self.state0 = [tf.zeros([batchSize, units])]
        self.state1 = [tf.zeros([batchSize, units])]
        # [b 80] => [b 80 100]
        self.embedding = layers.Embedding(10000, 100, input_length=80)
        self.rnn = keras.Sequential([
            layers.LSTM(units, dropout=0.5, return_sequences=True, unroll=True),
            layers.LSTM(units, dropout=0.5, unroll=True)
        ])
        self.fc = layers.Dense(1)

    def call(self, inputs, training=None, mask=None):
        x = self.embedding(inputs)
        x = self.rnn(x)
        x = self.fc(x)

        return tf.nn.sigmoid(x)

model = MyRNN(128, 128)
model.compile(optimizer=optimizers.Adam(lr=0.0001),
              loss=tf.losses.BinaryCrossentropy(),
              metrics=['accuracy'])
model.fit(db_train, epochs=15, validation_data=db_test, validation_freq=2)

六、迁移学习

1、使用VGG19迁移学习CIFAR100

import datetime
import io
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
from tensorflow_core.python.keras import regularizers

img_mean = tf.constant([0.485, 0.456, 0.406])
img_std = tf.constant([0.229, 0.224, 0.225])
def normalize(x, mean=img_mean, std=img_std):
    # x shape: [224, 224, 3]
    # mean:shape为1;这里用到了广播机制。我们安装好右边对齐的原则,可以得到如下;
    # mean : [1, 1, 3], std: [3]        先插入1
    # mean : [224, 224, 3], std: [3]    再变为224
    x = (x - mean)/std
    return x
def preprocess(x, y):
    x = tf.cast(x, dtype=tf.float32) / 255.0
    x = tf.image.random_flip_left_right(x)
    # x = tf.image.resize(x, [224, 224])
    x = normalize(x)
    # x = tf.transpose(x, perm=[2,1,0])
    y = tf.cast(y, dtype=tf.int32)
    y = tf.squeeze(y)
    y = tf.one_hot(y, depth=100)
    return x, y

# 加载Keras已有网络
vgg19 = keras.applications.VGG19(weights="imagenet", include_top=False, pooling="max")
(x_train, y_train), (x_test, y_test) = datasets.cifar100.load_data()
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
db_train = db_train.map(preprocess).shuffle(10000).batch(128)

db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test))
db_test = db_test.map(preprocess).batch(128)
vgg19.trainable = False
model = Sequential([
    vgg19,
    layers.Flatten(),
    layers.Dense(1000, activation=tf.nn.relu, kernel_initializer='glorot_normal'),
    layers.Dropout(0.3),
    layers.Dense(500, activation=tf.nn.relu, kernel_initializer='glorot_normal'),
    layers.Dropout(0.3),
    layers.Dense(200, activation=tf.nn.relu, kernel_initializer='glorot_normal'),
    layers.Dense(100)
])

print(vgg19.input_shape)

model.summary()
model.compile(optimizer=optimizers.Adam(lr=0.001),
              loss=tf.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])
for i in range(40):
    model.fit(db_train, epochs=2, validation_data=db_test, validation_freq=2)
    db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    db_train = db_train.map(preprocess).shuffle(10000).batch(128)

七、自编码器

1、对MNIST数据集进行自编码

import datetime
import io
import tensorflow as tf
from PIL import Image
import numpy as np
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics

tf.random.set_seed(2345)

def save_images(imgs, name):
    new_im = Image.new('L', (280, 280))

    index = 0
    for i in range(0, 280, 28):
        for j in range(0, 280, 28):
            im = imgs[index]
            im = Image.fromarray(im, mode='L')
            new_im.paste(im, (i, j))
            index += 1

    new_im.save(name)

CODE_DIM = 2

(x_train, _1), (x_test, _2) = datasets.fashion_mnist.load_data()
x_train, x_test = x_train.astype(np.float32).reshape([-1, 784]) / 255., x_test.astype(np.float32).reshape([-1, 784]) / 255.
db_train = tf.data.Dataset.from_tensor_slices((x_train, x_train))
db_train = db_train.shuffle(10000).batch(128)
db_test = tf.data.Dataset.from_tensor_slices((x_test, x_test))
db_test = db_test.batch(128)

class AE(keras.Model):
    def __init__(self):
        super(AE, self).__init__()
        self.encoder = Sequential([
            layers.Dense(256, activation=tf.nn.relu),
            layers.Dense(128, activation=tf.nn.relu),
            layers.Dense(CODE_DIM)
        ])

        self.decoder = Sequential([
            layers.Dense(128, activation=tf.nn.relu),
            layers.Dense(256, activation=tf.nn.relu),
            layers.Dense(784)
        ])

    def call(self, inputs, training=None, mask=None):
        x = self.encoder(inputs)
        x = self.decoder(x)
        return x

model = AE()
model.build(input_shape=(None, 784))
model.summary()
model.compile(optimizer=optimizers.Adam(lr=0.01),
              loss="mse",
              metrics=['accuracy'])
for i in range(5):
    print("===============================================")
    model.fit(db_train, epochs=10)
    x, y = next(iter(db_test))
    y = tf.reshape(y, [-1, 28, 28])
    out = model(tf.reshape(x, [-1, 28*28]))
    out = tf.reshape(out, [-1, 28, 28])

    true_fake = tf.concat([y[0:50] ,out[0:50]], axis=0)
    true_fake = true_fake.numpy() * 255
    true_fake = true_fake.astype(np.uint8)

    save_images(true_fake, str(i)+"test.jpg")

2、VariationalAutoEncoder

import datetime
import io
import tensorflow as tf
from PIL import Image
import numpy as np
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics

tf.random.set_seed(2345)

def save_images(imgs, name):
    new_im = Image.new('L', (280, 280))

    index = 0
    for i in range(0, 280, 28):
        for j in range(0, 280, 28):
            im = imgs[index]
            im = Image.fromarray(im, mode='L')
            new_im.paste(im, (i, j))
            index += 1

    new_im.save(name)

BATCH_SIZE = 128
CODE_DIM = 20

(x_train, _1), (x_test, _2) = datasets.mnist.load_data()
x_train, x_test = x_train.astype(np.float32).reshape([-1, 28, 28, 1]) / 255., x_test.astype(np.float32).reshape([-1, 28, 28, 1]) / 255.
db_train = tf.data.Dataset.from_tensor_slices(x_train)
db_train = db_train.shuffle(10000).batch(BATCH_SIZE)
db_test = tf.data.Dataset.from_tensor_slices(x_test)
db_test = db_test.batch(50, drop_remainder=True)

def reparameter(mu, log_var):
    eps = tf.random.normal(log_var.shape)

    std = tf.exp(log_var * 0.5)

    z = mu + std * eps
    return z

class VariationalAutoEncoder(keras.Model):
    def __init__(self):
        super(VariationalAutoEncoder, self).__init__()
        self.encoder = Sequential([
            layers.Conv2D(64, strides=2, kernel_size=[1, 1], padding="same", activation=tf.nn.relu),
            layers.Conv2D(128, strides=2, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
            layers.Conv2D(256, strides=2, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
            layers.Flatten(),
            layers.Dense(500, activation=tf.nn.relu)
        ])
        self.mean = Sequential([
            layers.Dense(CODE_DIM, activation=tf.nn.relu)
        ])
        self.variance = Sequential([
            layers.Dense(CODE_DIM, activation=tf.nn.relu)
        ])
        self.decoder = Sequential([
            layers.Flatten(),
            layers.Dense(1024, activation=tf.nn.relu),
            layers.Dense(900, activation=tf.nn.relu),
            layers.Dense(784)
        ])

    def call(self, inputs, training=None, mask=None):
        x = self.encoder(inputs)
        mean = self.mean(x)
        log_var = self.variance(x)
        z = reparameter(mean, log_var)
        out = self.decoder(z)
        return out, mean, log_var

VAE = VariationalAutoEncoder()
optimizer = tf.optimizers.Adam(1e-3)

for epoch in range(1000):

    for step, x in enumerate(db_train):

        rx = tf.reshape(x, [-1, 784])

        with tf.GradientTape() as tape:
            out, mean, log_var = VAE(x)

            rec_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=rx, logits=out)
            rec_loss = tf.reduce_sum(rec_loss) / x.shape[0]

            # compute kl divergence (mu, var) ~ N (0, 1)
            # https://stats.stackexchange.com/questions/7440/kl-divergence-between-two-univariate-gaussians
            kl_div = -0.5 * (log_var + 1 - mean**2 - tf.exp(log_var))
            kl_div = tf.reduce_sum(kl_div) / x.shape[0]

            loss = rec_loss + 3. * kl_div

        grads = tape.gradient(loss, VAE.trainable_variables)
        optimizer.apply_gradients(zip(grads, VAE.trainable_variables))

        if step % 100 == 0:
            print(epoch, step, 'kl div:', float(kl_div), 'rec loss:', float(rec_loss))

    z = tf.random.normal((100, CODE_DIM))
    logits = VAE.decoder(z)
    x_hat = tf.sigmoid(logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() * 255.
    x_hat = x_hat.astype(np.uint8)
    save_images(x_hat, 'vae_images/sampled_epoch%d.png' % epoch)

八、对抗神经网络

1、GAN

import glob

import numpy as np
import io

from PIL import Image
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
from tensorflow_core.python.keras import regularizers
import multiprocessing

def make_anime_dataset(img_paths, batch_size, resize=64, drop_remainder=True, shuffle=True, repeat=1):
    @tf.function
    def _map_fn(img):
        img = tf.image.resize(img, [resize, resize])
        img = tf.clip_by_value(img, 0, 255)
        img = img / 127.5 - 1
        return img

    dataset = disk_image_batch_dataset(img_paths,
                                       batch_size,
                                       drop_remainder=drop_remainder,
                                       map_fn=_map_fn,
                                       shuffle=shuffle,
                                       repeat=repeat)
    img_shape = (resize, resize, 3)
    len_dataset = len(img_paths) // batch_size

    return dataset, img_shape, len_dataset

def batch_dataset(dataset,
                  batch_size,
                  drop_remainder=True,
                  n_prefetch_batch=1,
                  filter_fn=None,
                  map_fn=None,
                  n_map_threads=None,
                  filter_after_map=False,
                  shuffle=True,
                  shuffle_buffer_size=None,
                  repeat=None):
    # set defaults
    if n_map_threads is None:
        n_map_threads = multiprocessing.cpu_count()
    if shuffle and shuffle_buffer_size is None:
        shuffle_buffer_size = max(batch_size * 128, 2048)  # set the minimum buffer size as 2048

    # [*] it is efficient to conduct `shuffle` before `map`/`filter` because `map`/`filter` is sometimes costly
    if shuffle:
        dataset = dataset.shuffle(shuffle_buffer_size)

    if not filter_after_map:
        if filter_fn:
            dataset = dataset.filter(filter_fn)

        if map_fn:
            dataset = dataset.map(map_fn, num_parallel_calls=n_map_threads)

    else:  # [*] this is slower
        if map_fn:
            dataset = dataset.map(map_fn, num_parallel_calls=n_map_threads)

        if filter_fn:
            dataset = dataset.filter(filter_fn)

    dataset = dataset.batch(batch_size, drop_remainder=drop_remainder)

    dataset = dataset.repeat(repeat).prefetch(n_prefetch_batch)

    return dataset

def memory_data_batch_dataset(memory_data,
                              batch_size,
                              drop_remainder=True,
                              n_prefetch_batch=1,
                              filter_fn=None,
                              map_fn=None,
                              n_map_threads=None,
                              filter_after_map=False,
                              shuffle=True,
                              shuffle_buffer_size=None,
                              repeat=None):
    """Batch dataset of memory data.

    Parameters
    ----------
    memory_data : nested structure of tensors/ndarrays/lists

    """
    dataset = tf.data.Dataset.from_tensor_slices(memory_data)
    dataset = batch_dataset(dataset,
                            batch_size,
                            drop_remainder=drop_remainder,
                            n_prefetch_batch=n_prefetch_batch,
                            filter_fn=filter_fn,
                            map_fn=map_fn,
                            n_map_threads=n_map_threads,
                            filter_after_map=filter_after_map,
                            shuffle=shuffle,
                            shuffle_buffer_size=shuffle_buffer_size,
                            repeat=repeat)
    return dataset

def disk_image_batch_dataset(img_paths,
                             batch_size,
                             labels=None,
                             drop_remainder=True,
                             n_prefetch_batch=1,
                             filter_fn=None,
                             map_fn=None,
                             n_map_threads=None,
                             filter_after_map=False,
                             shuffle=True,
                             shuffle_buffer_size=None,
                             repeat=None):
    """Batch dataset of disk image for PNG and JPEG.

    Parameters
    ----------
        img_paths : 1d-tensor/ndarray/list of str
        labels : nested structure of tensors/ndarrays/lists

    """
    if labels is None:
        memory_data = img_paths
    else:
        memory_data = (img_paths, labels)

    def parse_fn(path, *label):
        img = tf.io.read_file(path)
        img = tf.image.decode_png(img, 3)  # fix channels to 3
        return (img,) + label

    if map_fn:  # fuse `map_fn` and `parse_fn`
        def map_fn_(*args):
            return map_fn(*parse_fn(*args))
    else:
        map_fn_ = parse_fn

    dataset = memory_data_batch_dataset(memory_data,
                                        batch_size,
                                        drop_remainder=drop_remainder,
                                        n_prefetch_batch=n_prefetch_batch,
                                        filter_fn=filter_fn,
                                        map_fn=map_fn_,
                                        n_map_threads=n_map_threads,
                                        filter_after_map=filter_after_map,
                                        shuffle=shuffle,
                                        shuffle_buffer_size=shuffle_buffer_size,
                                        repeat=repeat)

    return dataset

discriminator = Sequential([
    layers.Conv2D(64, kernel_size=[5, 5], padding="same", strides=3, activation=tf.nn.leaky_relu),
    layers.BatchNormalization(),
    layers.Conv2D(128, kernel_size=[5, 5], padding="same", strides=3, activation=tf.nn.leaky_relu),
    layers.BatchNormalization(),
    layers.Conv2D(256, kernel_size=[5, 5], padding="same", strides=3, activation=tf.nn.leaky_relu),
    layers.BatchNormalization(),
    layers.Flatten(),
    layers.Dense(128, activation=tf.nn.leaky_relu),
    layers.Dense(64, activation=tf.nn.leaky_relu),
    layers.Dense(32, activation=tf.nn.leaky_relu),
    layers.Dense(16, activation=tf.nn.leaky_relu),
    layers.Dense(1)
])

generator = Sequential([
    layers.Dense(3 * 3 * 512, activation=tf.nn.leaky_relu),
    layers.Reshape((3, 3, 512)),
    # 9
    layers.Conv2DTranspose(256, 3, 3, 'valid', activation=tf.nn.leaky_relu),
    layers.BatchNormalization(),
    # (9-1)*2 + 5 = 21
    layers.Conv2DTranspose(128, 5, 2, 'valid', activation=tf.nn.leaky_relu),
    layers.BatchNormalization(),
    # (21-1)*3 + 4 = 64
    layers.Conv2DTranspose(3, 4, 3, 'valid', activation=tf.tanh),

])

def save_images(imgs, name):
    new_im = Image.new('RGB', (640, 640))

    index = 0
    for i in range(0, 640, 64):
        for j in range(0, 640, 64):
            im = imgs[index]
            im = np.array(im)
            im = ((im + 1.0) * 127.5).astype(np.uint8)
            im = Image.fromarray(im, mode='RGB')
            new_im.paste(im, (i, j))
            index += 1

    new_im.save(name)

def d_loss_fn(generator, discriminator, batch_z, batch_x):
    fake_images = generator(batch_z)
    fake_out = discriminator(fake_images)
    real_out = discriminator(batch_x)

    fake_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=fake_out, labels=tf.zeros_like(fake_out))
    real_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=real_out, labels=tf.ones_like(real_out))

    fake_loss = tf.reduce_mean(fake_loss)
    real_loss = tf.reduce_mean(real_loss)

    return fake_loss + real_loss

BATCH_SIZE = 512
CODE_SIZE = 100
LEARNING_RATE = 0.0002
EPOCHS = 20000
img_path = glob.glob(r"E:\BaiduNetdiskDownload\faces\*.jpg")
dataset, image_shape, _ = make_anime_dataset(img_path, BATCH_SIZE)
dataset = dataset.repeat()
db_iter = iter(dataset)

generator.build(input_shape=(None, CODE_SIZE))
discriminator.build(input_shape=(None, 64, 64, 3))
g_optimizer = optimizers.Adam(LEARNING_RATE)
d_optimizer = optimizers.Adam(LEARNING_RATE)

def g_loss_fn(generator, discriminator, batch_z):
    fake_images = generator(batch_z)
    fake_out = discriminator(fake_images)
    fake_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=fake_out, labels=tf.ones_like(fake_out))
    return tf.reduce_mean(fake_loss)

for epoch in range(EPOCHS):
    batch_z = tf.random.uniform([BATCH_SIZE, CODE_SIZE], minval=-1, maxval=1)
    batch_x = next(db_iter)

    with tf.GradientTape() as tape:
        d_loss = d_loss_fn(generator, discriminator, batch_z, batch_x)
    grads = tape.gradient(d_loss, discriminator.trainable_variables)
    d_optimizer.apply_gradients(zip(grads, discriminator.trainable_variables))

    with tf.GradientTape() as tape:
        g_loss = g_loss_fn(generator, discriminator, batch_z)
    grads = tape.gradient(g_loss, generator.trainable_variables)
    g_optimizer.apply_gradients(zip(grads, generator.trainable_variables))

    if epoch % 30 == 0:
        print("d-loss:", float(d_loss), "g-loss:", float(g_loss))
        z = tf.random.uniform([100, CODE_SIZE], minval=-1, maxval=1)
        images = generator(z)
        save_images(images, 'dc-gan/sampled_epoch%d.png' % epoch)

4、WGAN

import glob

import numpy as np
import io

from PIL import Image
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import datasets, layers, optimizers, Sequential, metrics
from tensorflow_core.python.keras import regularizers
import multiprocessing

BATCH_SIZE = 512
CODE_SIZE = 300
LEARNING_RATE = 0.0002
EPOCHS = 2000000

def make_anime_dataset(img_paths, batch_size, resize=64, drop_remainder=True, shuffle=True, repeat=1):
    @tf.function
    def _map_fn(img):
        img = tf.image.resize(img, [resize, resize])
        img = tf.clip_by_value(img, 0, 255)
        img = img / 127.5 - 1
        return img

    dataset = disk_image_batch_dataset(img_paths,
                                       batch_size,
                                       drop_remainder=drop_remainder,
                                       map_fn=_map_fn,
                                       shuffle=shuffle,
                                       repeat=repeat)
    img_shape = (resize, resize, 3)
    len_dataset = len(img_paths) // batch_size

    return dataset, img_shape, len_dataset

def batch_dataset(dataset,
                  batch_size,
                  drop_remainder=True,
                  n_prefetch_batch=1,
                  filter_fn=None,
                  map_fn=None,
                  n_map_threads=None,
                  filter_after_map=False,
                  shuffle=True,
                  shuffle_buffer_size=None,
                  repeat=None):
    # set defaults
    if n_map_threads is None:
        n_map_threads = multiprocessing.cpu_count()
    if shuffle and shuffle_buffer_size is None:
        shuffle_buffer_size = max(batch_size * 128, 2048)  # set the minimum buffer size as 2048

    # [*] it is efficient to conduct `shuffle` before `map`/`filter` because `map`/`filter` is sometimes costly
    if shuffle:
        dataset = dataset.shuffle(shuffle_buffer_size)

    if not filter_after_map:
        if filter_fn:
            dataset = dataset.filter(filter_fn)

        if map_fn:
            dataset = dataset.map(map_fn, num_parallel_calls=n_map_threads)

    else:  # [*] this is slower
        if map_fn:
            dataset = dataset.map(map_fn, num_parallel_calls=n_map_threads)

        if filter_fn:
            dataset = dataset.filter(filter_fn)

    dataset = dataset.batch(batch_size, drop_remainder=drop_remainder)

    dataset = dataset.repeat(repeat).prefetch(n_prefetch_batch)

    return dataset

def memory_data_batch_dataset(memory_data,
                              batch_size,
                              drop_remainder=True,
                              n_prefetch_batch=1,
                              filter_fn=None,
                              map_fn=None,
                              n_map_threads=None,
                              filter_after_map=False,
                              shuffle=True,
                              shuffle_buffer_size=None,
                              repeat=None):
    """Batch dataset of memory data.

    Parameters
    ----------
    memory_data : nested structure of tensors/ndarrays/lists

    """
    dataset = tf.data.Dataset.from_tensor_slices(memory_data)
    dataset = batch_dataset(dataset,
                            batch_size,
                            drop_remainder=drop_remainder,
                            n_prefetch_batch=n_prefetch_batch,
                            filter_fn=filter_fn,
                            map_fn=map_fn,
                            n_map_threads=n_map_threads,
                            filter_after_map=filter_after_map,
                            shuffle=shuffle,
                            shuffle_buffer_size=shuffle_buffer_size,
                            repeat=repeat)
    return dataset

def disk_image_batch_dataset(img_paths,
                             batch_size,
                             labels=None,
                             drop_remainder=True,
                             n_prefetch_batch=1,
                             filter_fn=None,
                             map_fn=None,
                             n_map_threads=None,
                             filter_after_map=False,
                             shuffle=True,
                             shuffle_buffer_size=None,
                             repeat=None):
    """Batch dataset of disk image for PNG and JPEG.

    Parameters
    ----------
        img_paths : 1d-tensor/ndarray/list of str
        labels : nested structure of tensors/ndarrays/lists

    """
    if labels is None:
        memory_data = img_paths
    else:
        memory_data = (img_paths, labels)

    def parse_fn(path, *label):
        img = tf.io.read_file(path)
        img = tf.image.decode_png(img, 3)  # fix channels to 3
        return (img,) + label

    if map_fn:  # fuse `map_fn` and `parse_fn`
        def map_fn_(*args):
            return map_fn(*parse_fn(*args))
    else:
        map_fn_ = parse_fn

    dataset = memory_data_batch_dataset(memory_data,
                                        batch_size,
                                        drop_remainder=drop_remainder,
                                        n_prefetch_batch=n_prefetch_batch,
                                        filter_fn=filter_fn,
                                        map_fn=map_fn_,
                                        n_map_threads=n_map_threads,
                                        filter_after_map=filter_after_map,
                                        shuffle=shuffle,
                                        shuffle_buffer_size=shuffle_buffer_size,
                                        repeat=repeat)

    return dataset

discriminator = Sequential([
    layers.Conv2D(64, kernel_size=[5, 5], padding="same", strides=3, activation=tf.nn.leaky_relu),
    layers.BatchNormalization(),
    layers.Conv2D(128, kernel_size=[5, 5], padding="same", strides=3, activation=tf.nn.leaky_relu),
    layers.BatchNormalization(),
    layers.Conv2D(256, kernel_size=[5, 5], padding="same", strides=3, activation=tf.nn.leaky_relu),
    layers.BatchNormalization(),
    layers.Flatten(),
    layers.Dense(128, activation=tf.nn.leaky_relu),
    layers.Dense(64, activation=tf.nn.leaky_relu),
    layers.Dense(32, activation=tf.nn.leaky_relu),
    layers.Dense(16, activation=tf.nn.leaky_relu),
    layers.Dense(1)
])

generator = Sequential([
    layers.Dense(3 * 3 * 512, activation=tf.nn.leaky_relu),
    layers.Reshape((3, 3, 512)),
    # 9
    layers.Conv2DTranspose(256, 3, 3, 'valid', activation=tf.nn.leaky_relu),
    layers.BatchNormalization(),
    # (9-1)*2 + 5 = 21
    layers.Conv2DTranspose(128, 5, 2, 'valid', activation=tf.nn.leaky_relu),
    layers.BatchNormalization(),
    # (21-1)*3 + 4 = 64
    layers.Conv2DTranspose(3, 4, 3, 'valid', activation=tf.tanh),

])

def save_images(imgs, name):
    new_im = Image.new('RGB', (640, 640))

    index = 0
    for i in range(0, 640, 64):
        for j in range(0, 640, 64):
            im = imgs[index]
            im = np.array(im)
            im = ((im + 1.0) * 127.5).astype(np.uint8)
            im = Image.fromarray(im, mode='RGB')
            new_im.paste(im, (i, j))
            index += 1

    new_im.save(name)

# 计算梯度惩罚项
def gradient_penalty(discriminator, batch_x, fake_images):
    t = tf.random.uniform([BATCH_SIZE, 1, 1, 1])
    t = tf.broadcast_to(t, batch_x.shape)
    # 元素乘法
    interplate = t * batch_x + (1 - t) * fake_images
    with tf.GradientTape() as tape:
        tape.watch([interplate])
        out = discriminator(interplate)
    grads = tape.gradient(out, interplate)
    grads = tf.reshape(grads, [BATCH_SIZE, -1])
    gp = tf.norm(grads, axis=1)
    return tf.reduce_mean((gp-1)**2)

def d_loss_fn(generator, discriminator, batch_z, batch_x):
    fake_images = generator(batch_z)
    fake_out = discriminator(fake_images)
    real_out = discriminator(batch_x)

    fake_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=fake_out, labels=tf.zeros_like(fake_out))
    real_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=real_out, labels=tf.ones_like(real_out))

    fake_loss = tf.reduce_mean(fake_loss)
    real_loss = tf.reduce_mean(real_loss)

    gp = gradient_penalty(discriminator, batch_x, fake_images)

    return fake_loss + real_loss + gp, gp

img_path = glob.glob(r"E:\BaiduNetdiskDownload\faces\*.jpg")
dataset, image_shape, _ = make_anime_dataset(img_path, BATCH_SIZE)
dataset = dataset.repeat()
db_iter = iter(dataset)

generator.build(input_shape=(None, CODE_SIZE))
discriminator.build(input_shape=(None, 64, 64, 3))
g_optimizer = optimizers.Adam(LEARNING_RATE)
d_optimizer = optimizers.Adam(LEARNING_RATE)

def g_loss_fn(generator, discriminator, batch_z):
    fake_images = generator(batch_z)
    fake_out = discriminator(fake_images)
    fake_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=fake_out, labels=tf.ones_like(fake_out))
    return tf.reduce_mean(fake_loss)

for epoch in range(EPOCHS):
    batch_z = tf.random.uniform([BATCH_SIZE, CODE_SIZE], minval=-1, maxval=1)
    batch_x = next(db_iter)

    with tf.GradientTape() as tape:
        d_loss, gp = d_loss_fn(generator, discriminator, batch_z, batch_x)
    grads = tape.gradient(d_loss, discriminator.trainable_variables)
    d_optimizer.apply_gradients(zip(grads, discriminator.trainable_variables))

    with tf.GradientTape() as tape:
        g_loss = g_loss_fn(generator, discriminator, batch_z)
    grads = tape.gradient(g_loss, generator.trainable_variables)
    g_optimizer.apply_gradients(zip(grads, generator.trainable_variables))

    if epoch % 30 == 0:
        print("d-loss:", float(d_loss), "g-loss:", float(g_loss), "gp", float(gp))
        z = tf.random.uniform([100, CODE_SIZE], minval=-1, maxval=1)
        images = generator(z)
        save_images(images, 'w-gan/sampled_epoch%d.png' % epoch)
    if epoch % 100 == 0:
        discriminator.save_weights("model-wgan/w-gan-d")
        generator.save_weights("model-wgan/w-gan-g")

0 条评论

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注