AutoEncoder, Latent(잠재) 벡터, DAE(Denoising AutoEncoder), 이상치 탐지(Anomaly Detection)
오늘은 배운 개념이 매우 심플하다.
AutoEncoder (오토인코더)
는 입력 데이터를 저차원의 벡터로 압축한 뒤 원래 크기의 데이터로 복원하는 신경망이다. 이미지를 보자. U-net
구조랑 비슷하게 보인다.code
부분이다. 저 벡터를 Latent vector (잠재 벡터)라고 하는데, 원본 데이터보다 차원이 작으면서도 원본 데이터의 특징을 잘 보존하고 있는 벡터라는 의미가 있다. AutoEncoder의 대표적인 활용은 다음 세 가지가 있다.
중요!!
LATENT_DIM = 64
class Autoencoder(Model):
def __init__(self, latent_dim):
super(Autoencoder, self).__init__()
self.latent_dim = latent_dim
self.encoder = tf.keras.Sequential([
layers.Flatten(),
layers.Dense(latent_dim, activation='relu'),
])
self.decoder = tf.keras.Sequential([
layers.Dense(784, activation='sigmoid'),
layers.Reshape((28, 28))
])
def call(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
model = Autoencoder(LATENT_DIM)
model.compile(optimizer='adam', loss='mse')
model.fit(x_train, x_train,
epochs=10,
shuffle=True,
validation_data=(x_test, x_test))
x_train = x_train[..., tf.newaxis]
과 같이 훈련/검증 데이터에 임의의 채널을 하나더 추가해서 노이즈를 주어야 한다.AutoEncoder는 특정 데이터의 중요 특징, 즉 잠재 벡터를 바탕으로 다시 원본 데이터로 복원할 때에 발생하는 오류, 즉 복원 오류(Reconstruction Error)를 최소화 하도록 훈련됩니다.
정상 데이터로만 훈련한 뒤에 비정상 데이터셋을 복원한다면 복원 오류가 커질 것입니다. 복원 오류가 특정한 임계값을 초과하는 경우 해당 데이터를 비정상으로 판단할 수 있습니다.
이것도 코드 참고 필요시 노트나 구글링 해볼 것. 여기 옮겨두진 않겠다.
오늘은 개념학습보다 실습 코드 적고 이해하는 데에 더 많은 시간과 어려움이 있었던 것 같다.
구글의 Quick Draw 데이터셋 중 고양이'를 스케치한 데이터를 활용한다.
- (손 낙서 데이터는 벡터 포맷으로 저장되어 있어서 비트맵 형태로 변환하여 사용해야 한다.)
이번 과제에서는 '고양이 스러움'을 학습하는 오토인코더 모델을 만들게 됩니다.
오토인코더의 Latent Vector가 고양이라는 스케치의 표현(Representation)을 담아낼 수 있도록 학습해보도록 합시다.
내가 공부하며 달아놓은 주석은 #=>
와 같이 적거나 아예 여러 줄 주석처리를 해서 기록해두었다.
[데이터 불러오기]
import matplotlib.pyplot as plt
import json, glob
import numpy as np
---
from tensorflow.keras.utils import get_file
BASE_PATH = 'https://storage.googleapis.com/quickdraw_dataset/full/binary/'
path = get_file('cat', BASE_PATH + 'cat.bin')
---
import PIL # => 파이썬에서 이미지 분석/처리를 쉽게 할 수 있도록 도와주는 라이브러리임. (PIL = Python Imaging Library) - pilow module
from PIL import ImageDraw
from struct import unpack
from sklearn.model_selection import train_test_split
import tensorflow as tf
def load_drows(path, train_size=0.85):
"""
데이터를 불러오는 역할을 하는 함수입니다.
"""
x = []
# 파일을 불러와 스케치를 하나하나 모읍니다.
# 스케치가 15바이트 헤더로 시작하기 때문이 이런 부분을 전처리하여 줍니다.
with open(path, 'rb') as f:
while True:
img = PIL.Image.new('L', (32, 32), 'white') # => PIL.Image.new(mode, size, color=0) // Creates a new image with the given mode and size. // L (8-bit pixels, black and white) 이외는 공식문서 참고하면 됨. https://pillow.readthedocs.io/en/stable/reference/Image.html
draw = ImageDraw.Draw(img) # => 공식문서 https://pillow.readthedocs.io/en/stable/reference/ImageDraw.html?highlight=from%20PIL%20import%20ImageDraw
header = f.read(15)
if len(header) != 15:
break
# 낙서는 x,y 좌표로 구성된 획(stroke) 목록으로 되어 있고, 각 좌표는 분리되어 저장되어 있습니다.
# 방금 위에서 생성한 ImageDraw 객체의 좌표 목록을 이용하기 위해 zip() 함수를 사용하여 합쳐주도록 합니다.
strokes, = unpack('H', f.read(2)) # => 참고 https://docs.python.org/ko/3/library/struct.html
for i in range(strokes):
n_points, = unpack('H', f.read(2))
fmt = str(n_points) + 'B'
read_scaled = lambda: (p // 8 for
p in unpack(fmt, f.read(n_points)))
points = [*zip(read_scaled(), read_scaled())]
draw.line(points, fill=0, width=2)
img = tf.keras.preprocessing.image.img_to_array(img) # => img_to_array(img)에서 tf.keras.preprocessing.image.img_to_array(img)로 코드 변경함. 공식문서: https://www.tensorflow.org/api_docs/python/tf/keras/utils/img_to_array
x.append(img)
x = np.asarray(x) / 255
return train_test_split(x, train_size=train_size)
# 입력받은 10만개의 고양이 낙서 데이터를 활용할 수 있다.
x_train, x_test = load_drows(path)
print(x_train.shape, x_test.shape) #=> (104721, 32, 32, 1) (18481, 32, 32, 1)
[오토 인코더 구축]
from re import X
from keras.layers import Input, Dense, Conv2D, MaxPooling2D, UpSampling2D, Reshape, Concatenate, Flatten, Lambda
from keras.models import Model
from keras.preprocessing.image import load_img, img_to_array, ImageDataGenerator
from keras.losses import binary_crossentropy, kullback_leibler_divergence
from keras import backend as K
def create_AE():
input_img = Input(shape=(32, 32, 1))
channels = 2
x = input_img
for i in range(4):
channels *= 2 # => channels = channels * 2 = 4 // 변수 참고 https://corytips.tistory.com/162
# 여기에 다운샘플링 할 수 있는 코드를 제작하세요.
# 구조는 아래 첨부된 이미지를 참조하세요.
# 사용할 함수 : Conv2D(activation='relu', padding='same'), Concatenate(), MaxPooling2D(padding='same')
x_1 = Conv2D(channels, (3,3), activation='relu', padding='same')(x) # => 채널은 필터수 인거 잊지 말자 // "same" 은 아웃풋이 원래 인풋과 동일한 길이를 갖도록 인풋을 패딩한다. (https://keras.io/ko/layers/convolutional/)
x_2 = Conv2D(channels, (2,2), activation='relu', padding='same')(x)
concatenate = Concatenate()([x_1, x_2]) # => Concatenate is used in a Sequential model, whereas concatenate is used in a Functional API이라곤 하는데, 이런 형태로 함수 뒤에 붙이니 되긴 하네. 출처: https://stackoverflow.com/questions/44720822/valueerror-with-concatenate-layer-keras-functional-api
x = MaxPooling2D(2, padding='same')(concatenate)
x = Dense(channels)(x)
for i in range(4):
# 여기에 업샘플링할 수 있는 코드를 제작 하세요.
# 사용할 함수 : Conv2D(activation='relu', padding='same'), UpSampling2D()
x = Conv2D(channels, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
channels //= 2 # => Channel을 2로 나눈 몫이 channel이 됨.
decoded = Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)
autoencoder = Model(input_img, decoded)
autoencoder.compile(optimizer='adadelta', loss='binary_crossentropy')
return autoencoder
autoencoder = create_AE()
autoencoder.summary()
'''
[기록 - 인코더 부분]
난 처음에 인코더 부분을 아래와 같이 직접 짜보고 이걸 for 문으로 어떻게 줄일 수 있을지 고민해보았다.
(for문의 i가 어떤 역할을 하는지, 어떻게 코드에 반영해야할지 고민을 많이 해봤는데, 도저히 들어갈 곳이 없어 찾아보니 그냥 똑같은 과정을 4번 반복한다는 의미로 쓸 수 있음을 배웠다.)
conv2d = Conv2D(4, (3,3), activation='relu', padding='same')(x) # => "same" 은 아웃풋이 원래 인풋과 동일한 길이를 갖도록 인풋을 패딩한다. (https://keras.io/ko/layers/convolutional/)
conv2d_1 = Conv2D(4, (3,3), activation='relu', padding='same')(conv2d)
concatenate = Concatenate()([conv2d, conv2d_1]) # => Concatenate is used in a Sequential model, whereas concatenate is used in a Functional API이라곤 하는데, 이런 형태로 함수 뒤에 붙이니 되긴 하네. 출처: https://stackoverflow.com/questions/44720822/valueerror-with-concatenate-layer-keras-functional-api
maxpool = MaxPooling2D((2,2), padding='same')(concatenate)
conv2d_2 = Conv2D(8, (3,3), activation='relu', padding='same')(maxpool)
conv2d_3 = Conv2D(8, (3,3), activation='relu', padding='same')(conv2d_2)
concatenate_2 = Concatenate()([conv2d_2, conv2d_3])
maxpool_2 = MaxPooling2D((2,2), padding='same')(concatenate_2)
conv2d_4 = Conv2D(16, (3,3), activation='relu', padding='same')(maxpool_2)
conv2d_5 = Conv2D(16, (3,3), activation='relu', padding='same')(conv2d_4)
concatenate_3 = Concatenate()([conv2d_4, conv2d_5])
maxpool_3 = MaxPooling2D((2,2), padding='same')(concatenate_3)
conv2d_6 = Conv2D(32, (3,3), activation='relu', padding='same')(maxpool_3)
conv2d_7 = Conv2D(32, (3,3), activation='relu', padding='same')(conv2d_6)
concatenate_4 = Concatenate()([conv2d_6, conv2d_7])
maxpool_4 = MaxPooling2D((2,2), padding='same')(concatenate_4)
-근데 이전 구조에 누적하는 위와 같은 방식이 아니라는 건, 문제에서 이미지로 주어진 모델 summary의 맨처음 conv2d 층이 'connected to'를 보면 둘다 'input_1[0][0]'인 것을 보고 동일한 인풋 데이터에서 합성곱을 해 concat한다는 걸 알았다.
-x_1과 x_2의 필터 사이즈가 다른 건 'Param #'으로 계산했다. 간단하게는 3*3*4 = 36 + 4 = 40 (편향 4개 포함) 이런 식으로 해도 되고, 아니면 이 수식을 참고해도 된다. (param_number = output_channel_number * (input_channel_number * kernel_height * kernel_width + 1))을 계산해보면 된다. 출처 - https://towardsdatascience.com/how-to-calculate-the-number-of-parameters-in-keras-models-710683dae0ca
-그리고 channel(필터 수)도 for loop에 의해 4, 8, 16, 32로 늘어나게 된다!!
[기록 - 디코더]
- 마찬가지로 디코더의 channel(필터 수)도 for loop에 의해 줄어듬. 위에서 채널 최종이 32였으니까 32, 16, 8, 4로 줄어드는 거임. outshape은 그런 의미로 이해하면 됨.
- 채널 수 줄이면서 업샘플링한다는 의미로 디코더는 생각하면 될 것 같다. 참고로 upsampling2D()는 크기를 단순하게 늘리는 것이다. Transposed Conv는 가중치 학습이 이뤄지지만 upsampling2D()은 학습이 이뤄지는 부분이 없음.
- 근데 필터가 (3,3)인건 어떻게 미리 알 수 있는거지..? 최종적으로 (3,3)으로 빼주니 그런 걸로 생각하면 되겠지? ++ 커널 사이즈를 짝수가 아니라 홀수로 빼는 건 이미지 가운데가 어디인지 명확히 알 수 있기 때문이라고 함. (예를 들어 2 by 2면 정 가운데가 애매해지니까.)
- 마지막에 sigmoid를 준 것은 흑백이미지라서가 아니라 픽셀을 0-1 사이의 값으로 달라는 거임. (앞에서 정규화 해줬으니) 만약 인풋으로 컬러 이미지를 넣었다면 앞의 채널 수만 3으로 바꾸는 거고 활성함수는 동일하게 시그모이드임.
'''
[모델 학습]
from keras.callbacks import TensorBoard
autoencoder.fit(x_train, x_train,
epochs=100,
batch_size=128,
shuffle=True,
validation_data=(x_test, x_test),
callbacks=[TensorBoard(log_dir='/tmp/autoencoder')])
[시각화]
cols = 25
idx = np.random.randint(x_test.shape[0], size=cols)
sample = x_test[idx]
decoded_imgs = autoencoder.predict(sample)
decoded_imgs.shape
from keras.layers import Reshape, Concatenate, Flatten, Lambda, Input, Dense, Conv2D, MaxPooling2D, UpSampling2D
from IPython.display import Image, display
from io import BytesIO
def decode_img(tile, factor=1.0):
tile = tile.reshape(tile.shape[:-1])
tile = np.clip(tile * 255, 0, 255)
return PIL.Image.fromarray(tile)
overview = PIL.Image.new('RGB', (cols * 32, 64 + 20), (128, 128, 128))
for idx in range(cols):
overview.paste(decode_img(sample[idx]), (idx * 32, 5))
overview.paste(decode_img(decoded_imgs[idx]), (idx * 32, 42))
f = BytesIO()
overview.save(f, 'png')
display(Image(data=f.getvalue()))