TensorFlow で実装しながら頭を整理
TensorFlow公式にチュートリアルがあるので、これに沿いながら
Convolutional Variational Autoencoder | TensorFlow Core
モジュールインポート
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
import matplotlib.pyplot as plt
データセットロード
いつもの MNIST を使う
train_size = 60000
batch_size = 32
test_size = 10000
(x_train, t_train), (x_test, t_test) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_train = x_train.reshape(x_train.shape + (1,))
x_train = np.where(x_train > .5, 1.0, 0.0).astype('float32')
x_test = x_test.astype('float32') / 255.
x_test = x_test.reshape(x_test.shape + (1,))
x_test = np.where(x_test > .5, 1.0, 0.0).astype('float32')
train_dataset = tf.data.Dataset.from_tensor_slices(x_train).shuffle(train_size).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices(x_test).shuffle(train_size).batch(batch_size)
ネットワーク部分の定義クラス
Encoderネットワーク:畳み込みを2層分+1層
Decoderネットワーク:Encoderと同様に
class CVAE(tf.keras.Model):
def __init__(self, latent_dim):
super(CVAE, self).__init__()
self.latent_dim = latent_dim
self.encoder = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28,28,1)),
tf.keras.layers.Conv2D(filters=32, kernel_size=3, strides=(2,2), activation='relu'),
tf.keras.layers.Conv2D(64, 3, strides=(2,2), activation=tf.nn.relu),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(latent_dim + latent_dim)
])
self.decoder = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(latent_dim)),
tf.keras.layers.Dense(units=7*7*32, activation=tf.nn.relu),
tf.keras.layers.Reshape(target_shape=(7,7,32)),
tf.keras.layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation='relu'),
tf.keras.layers.Conv2DTranspose(32, 3, strides=2, padding='same', activation='relu'),
tf.keras.layers.Conv2DTranspose(1, 3, strides=1, padding='same')
])
@tf.function
def sample(self, eps=None):
if eps is None:
eps = tf.random.normal(shape=(100, self.latent_dim))
return self.decode(eps, apply_sigmoid=True)
def encode(self, x):
mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
return mean, logvar
def reparameterize(self, mean, logvar):
eps = tf.random.normal(shape=mean.shape)
return eps * tf.exp(logvar * .5) + mean
def decode(self, z, apply_sigmoid=False):
logits = self.decoder(z)
if apply_sigmoid:
probs = tf.sigmoid(logits)
return probs
return logits
定義したネットワークを確認
latent_dim = 2
model = CVAE(latent_dim)
model.encoder.summary(), model.decoder.summary()
Model: "sequential"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
conv2d (Conv2D) (None, 13, 13, 32) 320
_________________________________________________________________
conv2d_1 (Conv2D) (None, 6, 6, 64) 18496
_________________________________________________________________
flatten (Flatten) (None, 2304) 0
_________________________________________________________________
dense (Dense) (None, 4) 9220
=================================================================
Total params: 28,036
Trainable params: 28,036
Non-trainable params: 0
_________________________________________________________________
Model: "sequential_1"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
dense_1 (Dense) (None, 1568) 4704
_________________________________________________________________
reshape (Reshape) (None, 7, 7, 32) 0
_________________________________________________________________
conv2d_transpose (Conv2DTran (None, 14, 14, 64) 18496
_________________________________________________________________
conv2d_transpose_1 (Conv2DTr (None, 28, 28, 32) 18464
_________________________________________________________________
conv2d_transpose_2 (Conv2DTr (None, 28, 28, 1) 289
=================================================================
Total params: 41,953
Trainable params: 41,953
Non-trainable params: 0
_________________________________________________________________
(None, None)
このチュートリアル損失関数をアレンジしている模様
これをそのまま採用してみる
optimizer = tf.keras.optimizers.Adam(1e-4)
def log_normal_pdf(sample, mean, logvar, raxis=1):
log2pi = tf.math.log(2. * np.pi)
return tf.reduce_sum(-.5 * ((sample - mean)**2. * tf.exp(-logvar) + logvar + log2pi), axis=raxis)
def compute_loss(model, x):
mean, logvar = model.encode(x)
z = model.reparameterize(mean, logvar)
x_logit = model.decode(z)
cross_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
logpx_z = -tf.reduce_sum(cross_loss, axis=[1,2,3])
logpz = log_normal_pdf(z, 0., 0.)
logqz_x = log_normal_pdf(z, mean, logvar)
return -tf.reduce_mean(logpx_z + logpz - logqz_x)
@tf.function
def train_step(model, x, optimizer):
with tf.GradientTape() as tape:
loss = compute_loss(model, x)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
生成データの画像保存関数
def generate_and_save_images(model, epoch, test_sample):
mean, logvar = model.encode(test_sample)
z = model.reparameterize(mean, logvar)
predictions = model.sample(z)
fig = plt.figure(figsize=(4,4))
for i in range(predictions.shape[0]):
plt.subplot(4, 4, i+1)
plt.imshow(predictions[i, :, :, 0], cmap='gray')
plt.axis('off')
plt.savefig('output/image_at_epoch_epoch{:04d}.png'.format(epoch))
plt.show()
学習前の生成画像を一枚保存
シャッフルされたデータセットから 16枚抜き出している
num_examples_to_generate = 16
assert batch_size >= num_examples_to_generate
for test_batch in test_dataset.take(1):
test_sample = test_batch[0:num_examples_to_generate, :,:,:]
generate_and_save_images(model, 0, test_sample)
学習前前なので、当然わけわからん画像が生成される
10 エポック分トレーニング回してみる
epochs = 10
for epoch in range(1, epochs +1):
for x_train in train_dataset:
train_step(model, x_train, optimizer)
loss = tf.keras.metrics.Mean()
for x_test in test_dataset:
loss(compute_loss(model, x_test))
elbo = -loss.result()
print('Epoch: {}, Test set ELBO: {}, time elapse for current epoch: '.format(epoch, elbo))
generate_and_save_images(model, epoch, test_sample)
エポック 1回目から何かそれっぽい画像が生成され始める
エポック 10回目で生成されたデータ