DCGAN - 코드를 통한 이해

조영재·2021년 9월 1일
0

DCGAN

목록 보기
3/3

앞서 설명했던 DCGAN의 구조를 코드를 분석하며 설명합니다.

  1. Dataset 준비
  2. Model 구조 정의
  3. Train
  4. Eval

의 순서로 진행합니다.

Dataset 준비

Dataset.py

import os
from zipfile import ZipFile

from tensorflow import keras
import matplotlib.pyplot as plt
import gdown

def get_celeba_data():
    if not os.path.isdir("celeba_gan"):
        os.makedirs("celeba_gan")

    if not os.path.isdir("celeba_gan/img_align_celeba"):
        url = "https://drive.google.com/uc?id=1O7m1010EJjLE5QxLZiM9Fpjs7Oj6e684"
        output = "celeba_gan/data.zip"
        gdown.download(url, output, quiet=True)

        with ZipFile("celeba_gan/data.zip", "r") as zipobj:
            zipobj.extractall("celeba_gan")

    dataset = keras.preprocessing.image_dataset_from_directory(
        "celeba_gan", label_mode=None, image_size=(64, 64), batch_size=32
    )
    dataset = dataset.map(lambda x: x / 255.0)
    return dataset

def show_one_in_list(dataset):
    for x in dataset:
        plt.axis("off")
        plt.imshow((x.numpy() * 255).astype("int32")[0])
        plt.show()
        break

if __name__ == "__main__":
    dataset = get_celeba_data()
    show_one_in_list(dataset)

get_celeba_data
Celeb_A 데이터를 다운로드하고, 적절하게 전처리한 dataset을 반환합니다.

keras.preprocessing.image_dataset_from_directory
지정한 폴더 내의 모든 이미지 파일을 전처리합니다.
image_size로 Resize하고, batch_size만큼 나누어 iteratable 한 객체를 생성합니다.
아래 API를 참고하여 더 자세하게 알 수 있습니다.
https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/image_dataset_from_directory

show_one_in_list
테스트를 위해 작성한 함수로, 전처리한 dataset에서 한장의 이미지를 뽑아 확인합니다.


모델 구조 정의

Model.py

Library

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

make_discriminator

def make_discriminator():
    discriminator = keras.Sequential(
    [
        keras.Input(shape=(64, 64, 3)),
        layers.Conv2D(64, kernel_size=4, strides=2, padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, kernel_size=4, strides=2, padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, kernel_size=4, strides=2, padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Flatten(),
        layers.Dropout(0.2),
        layers.Dense(1, activation="sigmoid"),
    ],
    name="discriminator",
    )
    discriminator.summary()
    return discriminator

keras.Sequential 모델을 생성합니다.

Conv2D(filters, kernel_size, strides, padding)
Convolution Layer를 추가합니다.

  • filters: 출력의 채널 크기를 지정합니다. (nums, width, height, filters)
  • kernel_size: 합성곱 행렬의 크기를 지정합니다.
  • stides: 합성곱 행렬의 1 Step Size를 지정합니다.
  • padding: same → Input의 각 변에 padding을 추가합니다. 이 때, 입력의 width, height와 출력의 width, height가 같아집니다.

LeakyReLU(alpha) LeakyReLU 활성화 함수를 추가합니다.

  • alpha: x < 0 일 때, 기울기를 지정합니다. ( x ≥ 0 일 때는 x = y)

make_generator

def make_generator():
    latent_dim = 128

    generator = keras.Sequential(
        [
            keras.Input(shape=(latent_dim,)),
            layers.Dense(8 * 8 * 128),
            layers.Reshape((8, 8, 128)),
            layers.Conv2DTranspose(128, kernel_size=4, strides=2, padding="same"),
            layers.LeakyReLU(alpha=0.2),
            layers.Conv2DTranspose(256, kernel_size=4, strides=2, padding="same"),
            layers.LeakyReLU(alpha=0.2),
            layers.Conv2DTranspose(512, kernel_size=4, strides=2, padding="same"),
            layers.LeakyReLU(alpha=0.2),
            layers.Conv2D(3, kernel_size=5, padding="same", activation="sigmoid"),
        ],
        name="generator",
    )
    generator.summary()
    return generator

Conv2DTranspose(filters, kernel_size, strides, padding)
fractionally strided convolution Layer를 추가합니다.
Conv2D와 인자의 역할은 같습니다.

GAN class

전체 내용은 아래와 같습니다.

class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn
        self.d_loss_metric = keras.metrics.Mean(name="d_loss")
        self.g_loss_metric = keras.metrics.Mean(name="g_loss")

    @property
    def metrics(self):
        return [self.d_loss_metric, self.g_loss_metric]

    def train_step(self, real_images):
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

        # Update metrics
        self.d_loss_metric.update_state(d_loss)
        self.g_loss_metric.update_state(g_loss)
        return {
            "d_loss": self.d_loss_metric.result(),
            "g_loss": self.g_loss_metric.result(),
        }

Declare

class GAN(keras.Model):

keras.Model Class를 상속합니다.

Init

def __init__(self, discriminator, generator, latent_dim):
    super(GAN, self).__init__()
    self.discriminator = discriminator
    self.generator = generator
    self.latent_dim = latent_dim

super(GAN, self).__init__()
상속받은 keras.Model Class의 생성자를 실행합니다.
discriminatorgenerator와 latent_dim을 인자로 받아 클래스의 프로퍼티로 추가합니다.

Compile

def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn
        self.d_loss_metric = keras.metrics.Mean(name="d_loss")
        self.g_loss_metric = keras.metrics.Mean(name="g_loss")

super(GAN, self).compile()
상속받은 keras.Model Class의 compile 함수를 실행합니다.

compile에 필요한 변수를 받아 프로퍼티로 추가합니다.

keras.metrics.Mean
Input 값에 가중치를 곱하여 평균치를 계산합니다.
https://www.tensorflow.org/api_docs/python/tf/keras/metrics/Mean

metrics

@property
def metrics(self):
    return [self.d_loss_metric, self.g_loss_metric]

@property
property decorator로 메소드를 속성으로 선언합니다.
gan.metrics 처럼 메소드 대신 프로퍼티 처럼 호출 할 수 있습니다.

compile에서 선언한 d_loss_metricg_loss_metric 속성을 List로 반환합니다.

train_step

def train_step(self, real_images):

		######################## step one ###############################
    # Sample random points in the latent space
    batch_size = tf.shape(real_images)[0]
    random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

    # Decode them to fake images
    generated_images = self.generator(random_latent_vectors)

    # Combine them with real images
    combined_images = tf.concat([generated_images, real_images], axis=0)

    # Assemble labels discriminating real from fake images
    labels = tf.concat(
        [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
    )
    # Add random noise to the labels - important trick!
    labels += 0.05 * tf.random.uniform(tf.shape(labels))

		######################## step two ###############################
    # Train the discriminator
    with tf.GradientTape() as tape:
        predictions = self.discriminator(combined_images)
        d_loss = self.loss_fn(labels, predictions)
    grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
    self.d_optimizer.apply_gradients(
        zip(grads, self.discriminator.trainable_weights)
    )

    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

    # Assemble labels that say "all real images"
    misleading_labels = tf.zeros((batch_size, 1))

    # Train the generator (note that we should *not* update the weights
    # of the discriminator)!
    with tf.GradientTape() as tape:
        predictions = self.discriminator(self.generator(random_latent_vectors))
        g_loss = self.loss_fn(misleading_labels, predictions)
    grads = tape.gradient(g_loss, self.generator.trainable_weights)
    self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

    # Update metrics
    self.d_loss_metric.update_state(d_loss)
    self.g_loss_metric.update_state(g_loss)
    return {
        "d_loss": self.d_loss_metric.result(),
        "g_loss": self.g_loss_metric.result(),
    }

Step One

batch_size 입력으로 받은 real_images의 개수를 저장합니다.

random_latent_vectors batch_size 만큼 generator에 입력할 latent_vector를 생성합니다.
ex) batch_size=512, latent_vector=128 일 때, (512, 128) 크기의 random vector 생성

generated_images generator에 latent_vector를 입력하여 가짜 이미지를 생성합니다.

combined_images 실제 이미지가짜 이미지를 연결하여 하나의 텐서로 만듭니다.

labels 실제 이미지가짜 이미지는 모두 batch_size만큼 있습니다. 실제 이미지에는 1을, 가짜 이미지에는 0을 할당 할 텐서를 만듭니다.

labels += 0.05 * tf.random.uniform(tf.shape(labels))
labels는 현재 0 또는 1의 값을 가지고 있습니다. 여기에 0.05*Random(0~1) 값을 더하여 약간씩 다르게합니다. discriminator의 학습에만 사용되는데 Noise를 추가함으로 판별능력을 키울 수 있는 것 같습니다.
매우 중요한 트릭이라고하는데, 어떻게 성능을 향상 시킬 수 있는지 이유는 모르겠네요.

Step Two

이전 GAN 의 가중치 갱신부와 같습니다.
다만, 이전에 선언한 labels 는 discriminator에 새로 선언한 정수만을 가지고 있는 misleading_labels 는 generator의 학습에 사용된다는 점이 다릅니다.

d_loss와 g_loss를 반환하고 함수를 종료합니다.

GANMonitor

class GANMonitor(keras.callbacks.Callback):
    def __init__(self, start_epoch=0, num_img=3, latent_dim=128):
        self.start_epoch = start_epoch
        self.num_img = num_img
        self.latent_dim = latent_dim

    def on_epoch_end(self, epoch, logs=None):
        cur_epoch = epoch + self.start_epoch
        random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))
        generated_images = self.model.generator(random_latent_vectors)
        generated_images *= 255
        generated_images.numpy()
        for i in range(self.num_img):
            img = keras.preprocessing.image.array_to_img(generated_images[i])
            img.save("results/generated_img_%03d_%d.png" % (cur_epoch, i))

        self.model.generator.save('./models/generator/%d' % (cur_epoch))
        self.model.discriminator.save('./models/discriminator/%d' % (cur_epoch))

keras.callbacks.Callback
콜백 객체를 생성합니다. 아래 API에 명시된 함수를 작성하면, 지정한 학습과정에서 콜백함수를 호출 할 수 있습니다.
위 코드에서는 on_epoch_end 즉, epoch가 끝날 때 마다 함수를 실행합니다.
https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/Callback

Epoch이 끝날 때 마다 Latent Vectors를 생성하여 가짜 이미지를 생성하고 저장합니다.
Generator가 이미지를 잘 생성하는지 확인 하기위해 사용합니다.


Train

이전까지 작성한 모듈을 불러와 Dataset Load와 Model 선언을 진행합니다.

지정된 파라미터로 학습을 진행합니다.

train.py

Import Library

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt
import os

from model import make_discriminator, make_generator, GAN, GANMonitor 
from dataset import get_celeba_data

load_model

def load_model(path, role):
    model_num = -1
    list_path = path + '/' + role
    for num in os.listdir(list_path):
        if int(num) > model_num:
            model_num = int(num)
    
    if model_num == -1:
        if role == "discriminator":
            return make_discriminator()
        elif role == "generator":
            return make_generator()
        else:
            raise "invalid argument error"
    else:
        model_path = list_path + '/' + str(model_num)
        return keras.models.load_model(model_path)

모델을 저장 할 때, Epoch에 따라 1, 2, 3, 4, ... 순으로 저장하고 있습니다. 가장 최근의 모델을 불러와 반환합니다. 만약 모델이 없다면, 새로 생성하여 반환합니다.

main

if __name__ == "__main__":
    dataset = get_celeba_data()

    discriminator = load_model('./models', "discriminator")
    generator = load_model('./models', "generator")
    
    latent_dim = 128
    start_epoch = 11
    epochs = 30  # In practice, use ~100 epochs

    gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
    gan.compile(
        d_optimizer=keras.optimizers.Adam(learning_rate=0.0001),
        g_optimizer=keras.optimizers.Adam(learning_rate=0.0001),
        loss_fn=keras.losses.BinaryCrossentropy(),
    )
    
    gan.fit(
        dataset, epochs=epochs, callbacks=[GANMonitor(start_epoch=start_epoch, num_img=10, latent_dim=latent_dim)]
    )

학습을 실행합니다.

생성 이미지

1 Epoch

사람의 형상은 있는데, 매우 불안정한 이미지를 생성합니다.

10 Epoch

이전보다 더 뚜렷해졌으나, 컬러가 비슷비슷하고 노이즈가 많습니다.

30 Epoch

다양한 색을 표현하고 이목구비를 확인 할 수 있게 되었습니다. 그래도 아직 공간이 일그러지거나 흐릿한 느낌이 있네요.

결론

1 Epoch에 약 16분의 시간이 걸려 30 Epcoh 까지 약 8시간이 소요되었습니다. 사람의 형상을 어느정도 알아 볼 수 있는 이미지를 생성하지만, 아직 프로덕트에 쓸정도로 자연스러운 이미지를 생성하지는 못합니다.

profile
Be Good Developer

0개의 댓글