[텐서플로] 모델링 / 학습 / Tensorboard / 모델 저장 및 불러오기 / 데이터 다루기

황성미·2023년 11월 5일
0
post-thumbnail

✍🏻 3일 공부 이야기.


오늘 학습한 실습 코드는 위 깃허브 사진 클릭 시 이동합니다 :)



👀 이전 시간 리마인드

딥러닝 프로젝트를 하기 위해 위 4가지는 꼭 할 줄 알아야한다고 했었구

프로젝트는 크게 총 4단계로 구성되어있다고 했었다!

프로젝트의 흐름을 다루는 방법을 이어서 살펴보자 :)

Modeling

📌 텐서플로에서 모델링 하는 방법

  1. Sequential
  2. Functional API model
  3. Sub class model(자유도가 가장 높으나 코드가 길어짐)

1번부터 다시 살펴보자.

Sequential

VGGNet

  • tf.keras.layers.Conv2D
  • tf.keras.layers.Activation
  • tf.keras.layers.MaxPool2D
  • tf.keras.layers.Flatten
  • tf.keras.layers.Dense

이번 시간에는 CNN의 VGGNet 모델을 구현해보자.

CNN은 원래 이미지 데이터 중에서도 컬러 이미지 데이터를 입력으로 받는 모델이었기 때문에 입력 데이터의 shape이 (num_data, 28, 28, 3)과 같은 형태였다.

우리가 앞서 만들었던 Dataloader클래스는 (num_data, 28, 28)의 입력을 받았기 때문에 채널수를 입력받을 수 있도록 차원을 늘려주는 작업이 필요하다.

class DataLoader():
    def __init__(self):
        # data load
        (self.train_x, self.train_y), \
            (self.test_x, self.test_y) = tf.keras.datasets.mnist.load_data()

    def scale(self, x):
        return (x / 255.0).astype(np.float32)

    def preprocess_dataset(self, dataset):

        (feature, target) = dataset

        # scaling #
        scaled_x = np.array([self.scale(x) for x in feature])

        ###### Add channel axis ######
        expanded_x = scaled_x[:, :, :, np.newaxis] # np.newaxis : 가짜 차원 하나 추가

        # label encoding #
        ohe_y = np.array([tf.keras.utils.to_categorical(
            y, num_classes=10) for y in target])
        
        return expanded_x, ohe_y

    def get_train_dataset(self):
        return self.preprocess_dataset((self.train_x, self.train_y))

    def get_test_dataset(self):
        return self.preprocess_dataset((self.test_x, self.test_y))

그리고 VGGNet을 구현해보자.

  • tf.keras.layers.Conv2D

    • filters: layer에서 사용할 Filter(weights)의 갯수
    • kernel_size: Filter(weights)의 사이즈
    • strides: 몇 개의 pixel을 skip 하면서 훑어지나갈 것인지 (출력 피쳐맵의 사이즈에 영향을 줌)
    • padding: zero padding을 만들 것인지. VALID는 Padding이 없고, SAME은 Padding이 있음 (출력 피쳐맵의 사이즈에 영향을 줌)
    • activation: Activation Function을 지정
  • tf.keras.layers.MaxPool2D

    • pool_size: Pooling window 크기
    • strides: 몇 개의 pixel을 skip 하면서 훑어지나갈 것인지
    • padding: zero padding을 만들 것인지
  • tf.keras.layers.Flatten

  • tf.keras.layers.Dense

    • units : 노드 갯수
    • activation : 활성화 함수
    • use_bias : bias 를 사용 할 것인지
    • kernel_initializer : 최초 가중치를 어떻게 세팅 할 것인지
    • bias_initializer : 최초 bias를 어떻게 세팅 할 것인지

📌 모델 구성

from tensorflow.keras.layers import Conv2D, MaxPool2D, Flatten, Dense

model = tf.keras.Sequential()
# 최초의 레이어는 Input의 shape을 명시해준다. (이 때 배치 axis는 무시한다.)
model.add(Conv2D(32, kernel_size=3, padding='same', activation='relu', input_shape=(28, 28, 1))) 
model.add(Conv2D(32, kernel_size=3, padding='same', activation='relu'))
model.add(MaxPool2D())
model.add(Conv2D(64, kernel_size=3, padding='same', activation='relu')) 
model.add(Conv2D(64, kernel_size=3, padding='same', activation='relu'))
model.add(MaxPool2D())
model.add(Flatten())
model.add(Dense(128, activation="relu"))
model.add(Dense(64, activation="relu"))
model.add(Dense(10, activation="softmax")) # output class 개수 10

model.summary()

📌 optimizer, loss 선언 및 컴파일

learning_rate = 0.03
opt = tf.keras.optimizers.Adam(learning_rate)
loss = tf.keras.losses.categorical_crossentropy

model.compile(optimizer=opt, loss=loss, metrics=["accuracy"])

📌 학습 및 평가

# 학습
hist = model.fit(train_x, train_y,
                 epochs=2, batch_size=128,
                 validation_data=(test_x, test_y)) 
                # validation_data 를 지정해주면 .evaluate를 굳이 하지 않아도 매 epochs마다 저절로 해줌
                
                
                
# 평가
plt.figure(figsize=(10, 5))
plt.subplot(221)
plt.plot(hist.history['loss'])
plt.title("loss")
plt.subplot(222)
plt.plot(hist.history['accuracy'], 'b-')
plt.title("acc")
plt.subplot(223)
plt.plot(hist.history['val_loss'])
plt.title("val_loss")
plt.subplot(224)
plt.plot(hist.history['val_accuracy'], 'b-')
plt.title("val_accuracy")

plt.tight_layout()
plt.show()

💻 출력



Functional API model

앞서 만들었던 모델을 Functional API를 이용해 만든다면 아래와 같은 코드를 짤 수 있다.

from tensorflow.keras.layers import Input, Conv2D, MaxPool2D, Flatten, Dense

input_shape=(28, 28, 1)

inputs = Input(input_shape) # 입력을 받는 레이어가 첫번째 레이어

net = Conv2D(32, kernel_size=3, padding='same', activation='relu')(inputs) # 입력값을 inputs 변수로 넣어줌
net = Conv2D(32, kernel_size=3, padding='same', activation='relu')(net)
net = MaxPool2D()(net)

net = Conv2D(64, kernel_size=3, padding='same', activation='relu')(net)
net = Conv2D(64, kernel_size=3, padding='same', activation='relu')(net)
net = MaxPool2D()(net)
net = Flatten()(net)

net = Dense(128, activation="relu")(net)
net = Dense(64, activation="relu")(net)
net = Dense(10, activation="softmax")(net)

model = tf.keras.Model(inputs=inputs, outputs=net, name='VGG')
model.summary()

입력 데이터를 받는 입력 레이어가 첫 번째로 오고 다른 레이어들은 동일하게 써주되, 마지막에 input 데이터로 이전 레이어를 명시해주면서 레이어를 쌓아간다.

그리고 마지막에 tf.keras.Model를 선언하여 모델을 만들 수 있는데 이게 왜 Sequential보다 유연하다는 것일까?

ResNet 모델을 만들어보면서 <유연하다>의 의미를 한 번 살펴보자.


ResNet


ResNet은 기본 아이디어가 Input 데이터와 Output 데이터를 합쳐가며 학습을 진행했더니 성능이 더 좋더라~! 이다.


이렇듯 ResNet의 핵심은 왼쪽 그림과 같은 Input을 넣으면 그냥 Output을 출력해주는 흐름이 아닌 오른쪽 그림과 같이 Input과 Output을 더해주어야하는 것이다.

이를 Functional API로 구현해보자.


📌 Functional API로 구현한 ResNet

from tensorflow.keras.layers import Input, Conv2D, MaxPool2D, Flatten, Dense, Add

## Functional API 를 이용해 ResNet 구현
def build_resnet(input_shape):
    inputs = Input(input_shape) # 입력 데이터를 받는 첫 번째 레이어 # (28, 28)

    net = Conv2D(32, kernel_size=3, strides=2,
                 padding='same', activation='relu')(inputs) # (14, 14)
    net = MaxPool2D()(net) # (7, 7)
    
    net1 = Conv2D(64, kernel_size=1, padding='same', activation='relu')(net)
    net2 = Conv2D(64, kernel_size=3, padding='same', activation='relu')(net1)
    net3 = Conv2D(64, kernel_size=1, padding='same', activation='relu')(net2)
    
    # net과 net3을 더해주어야하는데 filter 개수가 32와 64로 맞지 않는다
    # 이를 맞춰주기 위해 net1_1 생성
    net1_1 = Conv2D(64, kernel_size=1, padding='same')(net)
    net = Add()([net1_1, net3]) # input과 output을 더해준 것
    
    net1 = Conv2D(64, kernel_size=1, padding='same', activation='relu')(net)
    net2 = Conv2D(64, kernel_size=3, padding='same', activation='relu')(net1)
    net3 = Conv2D(64, kernel_size=1, padding='same', activation='relu')(net2)
    
    net = Add()([net, net3]) # input과 output을 더해준 것
    
    net = MaxPool2D()(net)
    
    net = Flatten()(net)
    net = Dense(10, activation="softmax")(net)

    model = tf.keras.Model(inputs=inputs, outputs=net, name='resnet')
    
    return model

CIFAR 10 데이터로 학습해보자면

해당 데이터의 input shape에 맞게 모델을 불러와주고 학습시키면 된다.


📌 모델 선언

model = build_resnet((32, 32, 3))

lr = 0.03
opt = tf.keras.optimizers.Adam(lr)
loss = tf.keras.losses.categorical_crossentropy

model.compile(optimizer=opt, loss=loss, metrics=["accuracy"])

📌 학습 및 평가

# 학습
hist = model.fit(train_x, train_y,
                 epochs=10, batch_size=128,
                 validation_data=(test_x, test_y))
                 
# 평가
plt.figure(figsize=(10, 5))
plt.subplot(221)
plt.plot(hist.history['loss'])
plt.title("loss")
plt.subplot(222)
plt.plot(hist.history['accuracy'], 'b-')
plt.title("acc")
plt.subplot(223)
plt.plot(hist.history['val_loss'])
plt.title("val_loss")
plt.subplot(224)
plt.plot(hist.history['val_accuracy'], 'b-')
plt.title("val_accuracy")

plt.tight_layout()
plt.show()

모델 중간의 값을 활용할 수 있다는 점에서 Sequential보다 유연한 것 같다!



Sub class model

Linear Regression


📌 Linear Regression model

class LinearRegression(tf.keras.layers.Layer):
    def __init__(self, units):
        super(LinearRegression, self).__init__()
        self.units = units # output 개수

    def build(self, input_shape): # 변수를 받는 오버라이딩 
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal", # 어느 분포에서 random하게 추출할 것인지 설정
            trainable=True,
        )
        self.b = tf.Variable(0.0) # 위와 같이 만들어도 되고 이 문장과 같이 만들어도 됨

    def call(self, inputs): # 출력을 해줄 값을 지정하는 오버라이딩
        return tf.matmul(inputs, self.w) + self.b

tf.keras.layers.Layer를 상속받아 사용하는 것이기 때문에 <상속>과 <오버라이딩>에 익숙해질 필요가 있어보였다.

위 사진은 tf.keras.layers.Layer 메서드들을 볼 수 있는 공식 사이트이다. 필요한 메서드가 있다면 오버라이딩해서 사용하면 될 듯 하다.

가상의 데이터를 만들어 위 내용을 실습해보자.

📌 데이터 준비

# 정답
W_true = np.array([[3., 2., 4., 1.]]).reshape((4, 1))
B_true = np.array([1.]) 

X = tf.random.normal((500, 4))
noise = tf.random.normal((500, 1))

y = X @ W_true + B_true + noise

📌 학습

opt = tf.keras.optimizers.SGD(learning_rate=0.03)


linear_layer = LinearRegression(1)

for epoch in range(100):
    with tf.GradientTape() as tape:
        y_hat = linear_layer(X)
        loss = tf.reduce_mean(tf.square((y - y_hat)))
        
    grads = tape.gradient(loss, linear_layer.trainable_weights)
    # assign 대신 업데이트하는 방식
    opt.apply_gradients(zip(grads, linear_layer.trainable_weights))
    
    if epoch % 10 == 0:
        print("epoch : {} loss : {}".format(epoch, loss.numpy()))

기존에는 .assign을 이용하여 가중치를 업데이트해주었다면 .apply_gradients를 통해 업데이트 할 수 있다는 것을 배울 수 있었다.

그렇다면 이번에는 앞서 구현했던 ResNet 모델을 Sub class로 구현해보자.

ResNet

동일하게 반복되는 구간이 있었는데(conv2D 3번 - input과 output 더하는 코드) 이런 부분을 Residual Block이라 한다. 이 부분을 하나의 레이어로 만들어 은닉화 하는 과정을 거친 후 모델을 구현해보자.


📌 Residual Block

class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, filters=32, filter_match=False):
        super(ResidualBlock, self).__init__()
    
        # 필요한 레이어들 받아오기
        self.conv1 = Conv2D(filters, kernel_size=1, padding='same', activation='relu')
        self.conv2 = Conv2D(filters, kernel_size=3, padding='same', activation='relu')
        self.conv3 = Conv2D(filters, kernel_size=1, padding='same', activation='relu')
        self.add = Add()
        
        self.filters = filters
        self.filter_match = filter_match

        # 첫번째 step에서 filter 개수가 맞지 않아
        # 한 번 Conv2D를 다시 64로 생성하여 input과 output을 add해준 적이 있다.
        # 그 부분을 조건문으로 처리하면 아래와 같다.
        if filter_match:
            self.conv_ext = Conv2D(filters, kernel_size=1, padding='same')
        
    def call(self, inputs):
        net1 = self.conv1(inputs)
        net2 = self.conv2(net1)
        net3 = self.conv3(net2)
        
        if self.filter_match:
            res = self.add([self.conv_ext(inputs), net3])
        else: 
            res = self.add([inputs, net3])
        return res 
        
        

앞서 반복된 부분을 처리해주고,
첫 번째 input과 output을 더할 때 dim이 맞지 않아 conv2D를 한 번 더 해주고 더한 코드 부분은 조건문을 통해 처리하도록 해주었다.

이를 이용하여 ResNet 을 구성하면 아래와 같다.


📌 ResNet using Sub Class

class ResNet(tf.keras.Model):

    def __init__(self, num_classes):
        super(ResNet, self).__init__()
        
        self.conv1 = Conv2D(32, kernel_size=3, strides=2, padding='same', activation='relu')
        self.maxp1 =  MaxPool2D()
        self.block_1 = ResidualBlock(64, True)
        self.block_2 = ResidualBlock(64)
        self.maxp2 =  MaxPool2D()
        self.flat = Flatten()
        self.dense = Dense(num_classes)

    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.maxp1(x)
        x = self.block_1(x)
        x = self.block_2(x)
        x = self.maxp2(x)
        x = self.flat(x)
        return self.dense(x)

아마 비교해보면 코드도 훨씬 간단해보일 것이다.

나는 아직 상속과 오버라이딩에 익숙하지 않아서 이해하는데에 시간이 조금 걸렸지만.. ㅠ 익숙해지기만 한다면 더할 나위없이 모델을 구성하는 데에 좋은 방법이 될 것 같다!!


👀 tf.keras.Model 공식 사이트

그리고 Functional API와 Sub Class로 만든 ResNet을 비교하여 보고 싶어서 정리해본 자료도 첨부해두겠다.

마지막으로 만든 ResNet 모델을 학습시키기 위해 CIFAR 10 데이터를 이용했으며 결과는 아래와 같다.

# train_y.shape (50000, 10)
model = ResNet(num_classes=10)

learning_rate = 0.03
opt = tf.keras.optimizers.Adam(learning_rate)
loss = tf.keras.losses.categorical_crossentropy

model.compile(optimizer=opt, loss=loss, metrics=["accuracy"])

hist = model.fit(train_x, train_y,
                 epochs=2, batch_size=128,
                 validation_data=(test_x, test_y))




Model 학습

Fit

model.compile() -> model.fit()을 통해 모델을 학습시킨다는 사실은 알고 있을 것이다.

몇 가지만 체크하고 넘어가자 :)

📌 compile()의 입력값

  • optimizer='rmsprop' : Optimizer
  • loss=None : Loss function
  • metrics=None : Metrics
  • loss_weights=None : loss가 여러 개인 경우 각 로스마다 다르게 중요도를 설정 할 수 있다.

이때 loss와 metrics를 지정할 수 있는 방법이 여러 가지가 있다.

그리고 여러 개의 loss와 metrics를 사용하고 싶다면 리스트 형태로 묶어주면 된다.

loss 지정하는 방법

1. tf.keras.losses 로 지정
우리가 그동안 많이 썼던 방법 !

learning_rate = 0.03
opt = tf.keras.optimizers.Adam(learning_rate)
loss = tf.keras.losses.categorical_crossentropy

model.compile(optimizer=opt, loss=loss, metrics=["accuracy"])

2. 사용자 정의 함수로 지정

 # 사용자 정의 loss : 입력으로 정답값, 예측값의 순서로 꼭 받아야함
 def custom_loss(y_true, y_pred):
     return tf.reduce_mean(tf.square(y_true - y_pred))

 model.compile(optimizer=opt, loss=custom_loss, metrics=["accuracy"])

여러 개로 지정하고 싶다면 아래와 같이 리스트의 형태로 묶어주면 되고 loss_weights를 이용해 각각의 loss에 대해 중요도까지 설정해주면 각 loss에 weights를 곱해서 더한 값을 최종 loss로 사용하게 된다.

#여러 개의 Loss : 리스트 형태로 입력
model.compile(optimizer=opt, loss=[loss, custom_loss], metrics=["accuracy"])

#여러 개의 Loss + loss weights
# 각각의 loss에 대한 중요도 조절
# 각 loss에 wieghts를 곱해서 더한 값을 최종 loss로 사용
model.compile(optimizer=opt, loss=[loss, custom_loss], loss_weights=[0.7, 0.3], metrics=["accuracy"])

3. 텍스트로 지정

loss = "categorical_crossentropy" # 이렇게 텍스트로 가능한 함수도 있음.

model.compile(optimizer=opt, loss=loss, metrics=["accuracy"])



metrics 지정하는 방법

1. tf.keras.metrics 로 지정

# mertircs에도 여러 개를 넣을 수 있음
acc = tf.keras.metrics.Accuracy()
auc = tf.keras.metrics.AUC()

model.compile(optimizer=opt, loss=loss, metrics=[acc, auc])

2. 사용자 정의 함수로 지정

def custom_metric(y_true, y_pred):

    true = tf.argmax(y_true, axis=-1)
    pred = tf.argmax(y_pred, axis=-1)

    return tf.reduce_sum(tf.cast(tf.equal(true, pred), tf.int32))

model.compile(optimizer=opt, loss=loss, metrics=[custom_metric])

Fit

compile을 완료했다면 이제 학습시킬 차례!

Fit의 입력값으로는 대강 아래와 같은 것들이 있다.

  • x=None
  • y=None
  • batch_size=None
  • epochs=1
  • verbose='auto' : 학습과정 출력문의 모드
  • callbacks=None : Callback 함수
  • validation_split=0.0 : 입력데이터의 일정 부분을 Validation 용 데이터로 사용함
  • validation_data=None : Validation 용 데이터
  • shuffle=True : 입력값을 Epoch 마다 섞는다.
  • class_weight=None : 클래스 별로 다른 중요도를 설정한다.
  • ...
hist = model.fit(train_x, 
                 train_y,
                 epochs=1, 
                 batch_size=128, 
                 validation_split=0.3,
                 verbose=1
                )

그동안 했던 방식과 같이 위와 같은 형태로 fit을 해주면 된다.

해당 코드가 돌아가는 동안 우리는 아무 작업도 할 수 없는 상태가 되는데, 함수가 돌아가는 와중에도 특정한 주기로 원하는 코드를 실행시킬 수 있게 해주는 것이 callback 함수이다.

callback

입력값으로 epoch과 lr를 받는 tf.keras.callbacks.LearningRateScheduler 를 이용해 작성되는 callback은 학습이 진행됨에 따라 lr를 점점 줄여서 안정적으로 수렴을 하도록 도움을 준다던가, earlystoping을 한다던가, history를 남기는 등 다양한 기능이 있다.

아래는 epoch이 10 이상이 되면 lr을 줄여 안정적으로 수렴할 수 있도록 해주는 callback 함수이다.

# epoch이 10을 초과할 때마다 lr을 감소시켜 안정적으로 수렴하도록 도움을 줌
def scheduler(epoch, lr):
    if epoch > 10:
        return lr * (0.9**(epoch - 10))
    else: 
        return lr

lr_scheduler = tf.keras.callbacks.LearningRateScheduler(scheduler)

hist = model.fit(train_x, 
                 train_y,
                 epochs=20, 
                 batch_size=128, 
                 validation_split=0.3,
                 verbose=1,
                 callbacks=[lr_scheduler],
                )

💻 출력

마지막 열에 loss도 같이 출력되는 것을 볼 수 있다!


Logic(Fit 과정을 직접 구현해보자)

앞서 model의 학습을 fit을 통해 해결했지만 자유도가 높은 모델을 구현하고자 한다면 학습하는 과정을 직접 코드로 구현해야하는 경우도 생긴다.

fit을 하드코딩해보자.

'''
for e in epochs:
    for batch_x, batch_y in dataset:
        pred = model(batch_x)
        loss_fn(batch_y, pred)

        gradients
        weight_update

        print
'''

대략적으로 우리는 위와 같은 흐름으로 학습이 진행되어야하는 것을 알고 있다.

lr, opt, loss, metrics를 선언해주고
학습하는 과정을 함수로 만든 다음,
각 epoch의 배치 데이터마다 코드가 돌아가는 과정을 구현해보자.


📌 lr, opt, loss, metrics를 선언

learning_rate = 0.03
# optimizer
opt = tf.keras.optimizers.Adam(learning_rate) # 클래스
# loss 
loss_fn = tf.keras.losses.categorical_crossentropy # 함수로 만든 것
# metrics
train_loss = tf.keras.metrics.Mean(name='train_loss') # 클래스 객체로 만든 것
train_accuracy = tf.keras.metrics.CategoricalAccuracy(name='train_accuracy')

클래스로 선언한 것들은 각 메서드를 사용할 수 있다는 말이기도 하다 :)


📌 학습 과정(pred, loss gradient update까지)

# 학습 과정(pred, loss gradient update까지)
@tf.function
def train_step(x, y) :
    with tf.GradientTape() as tape:
        pred = model(x) # 예측값
        loss = loss_fn(y, pred) # loss 계산
        
    gradients = tape.gradient(loss, model.trainable_variables) # gradient 계산
    opt.apply_gradients(zip(gradients, model.trainable_variables)) # 업데이트
    
    # metric
    train_loss(y)
    train_accuracy(y, pred)

@tf.function을 하는 이유는 아래와 같다.

@tf.function으로 함수 위에 붙여주면 GPU로 학습시켰을 때 학습 속도가 향상되는 효과를 볼 수 있다.
그냥 함수로 정의만 해두면 매 배치 데이터를 돌 때마다 계속 함수를 새로 불러오는 형식으로 진행되는데 @tf.function를 해두면 한 번 불러오고 저장해두므로 불러 쓰기가 더 쉬워진다.


📌 매 epoch에 대해 배치 데이터마다 학습하는 과정

batch_size = 64

for epoch in range(1): # epoch을 도는데 
    
    for i in range(train_x.shape[0] // batch_size): # 배치 데이터마다 계산
        idx = i * batch_size
        x, y = train_x[idx:idx+batch_size], train_y[idx:idx+batch_size] # 배치 데이터 추출
        train_step(x, y)  # 학습 과정
        print("\r {} / {}".format(i, train_x.shape[0] // batch_size), end='\r') # 진행률 출력
    
    # 한 epoch에 대한 loss와 acc 출력
    fmt = 'epoch {} loss: {}, accuracy: {}'
    print(fmt.format(epoch+1, 
                          train_loss.result(), # .result() : 객체의 값을 반환해줌 
                          train_accuracy.result() * 100)
         )
    
    # 초기화
    # 매 epoch마다 새로운 metric 값을 계산해야지,
    # 만약 초기화를 해주지 않으면 이전 epoch의 metric 값이 계속 누적된 채로 계산됨
    train_loss.reset_states()
    train_accuracy.reset_states()

이때 .reset_states()로 매 epoch 본연의 값을 가질 수 있도록 초기화해주는 것이 가장 중요하다!!!




Evaluation

Tensorboard

보통 딥러닝 프로젝트를 하면 여러가지 모델을 동시에 돌려두고 기다려야하는 시간이 길기 때문에 로그만으로 트래킹하기 힘들다. 따라서 Tensorboard의 시각화 툴을 이용하여 모델의 진행 상황을 파악하는 데에 도움을 얻을 수 있다.

https://www.tensorflow.org/tensorboard?hl=ko

TensorFlow에서 제공하는 시각화툴로 중간의 그래프나 여러가지 정보를 Web UI로 조회할 수 있는 툴인데,
나는 무슨 이유 때문인지 VSCode에서도 그렇고 Colab에서도 링크가 연결이 되지 않은 이슈가 있었다 😅

그래도 일단 정리해보는...

방법 1. fit 함수를 이용한다면 callback 함수 이용하기

fit을 할 때 callbacks 옵션을 사용할 수 있는데 이때 TensorBoard를 사용하는 방법이다.

# 로그끼리 섞이지 않도록 잘 정리하는 것이 중요
import datetime
cur_time = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
log_dir = 'logs/fit/' + cur_time # 'logs/fit/20231104-233628'

tb_callback = tf.keras.callbacks.TensorBoard(log_dir = log_dir)

model.fit(x=train_x, 
          y=train_y, 
          epochs=5,
          validation_data=(test_x, test_y),
          callbacks=[tb_callback]
         )

로그끼리 섞이지 않도록 개인의 규칙을 따라 폴더 경로를 설정해주고 fit했다면

!tensorboard --logdir logs/fit --bind_all을 통해 TensorBoard로 이동하는 링크가 하나 생길 것이다. 그 사이트에서 로그들의 결과를 확인할 수 있다.



방법 2. tf.summary 사용하기

current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
train_log_dir = 'logs/gradient_tape/' + current_time + '/train'
test_log_dir = 'logs/gradient_tape/' + current_time + '/test'

train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)

위와 같이 tf.summary를 이용해 파일을 생성한다는 선언을 해주고
학습을 해주면 된다.

batch_size = 64

num_of_batch_train = train_x.shape[0] // batch_size
num_of_batch_test = test_x.shape[0] // batch_size

for epoch in range(5):
    
    for i in range(num_of_batch_train):
        idx = i * batch_size
        x, y = train_x[idx:idx+batch_size], train_y[idx:idx+batch_size]
        train_step(x, y)
        print("\r Train : {} / {}".format(i, num_of_batch_train), end='\r')

        
    for i in range(num_of_batch_test):
        idx = i * batch_size
        x, y = test_x[idx:idx+batch_size], test_y[idx:idx+batch_size]
        test_step(x, y)
        print("\r Test : {} / {}".format(i, num_of_batch_test), end='\r')
        
    with train_summary_writer.as_default():
        tf.summary.scalar('loss', train_loss.result(), step=epoch)
        tf.summary.scalar('acc', train_accuracy.result(), step=epoch)
        
    with test_summary_writer.as_default():
        tf.summary.scalar('loss', test_loss.result(), step=epoch)
        tf.summary.scalar('acc', test_accuracy.result(), step=epoch)
        
    fmt = 'epoch {} loss: {}, accuracy: {}, test_loss: {}, test_acc: {}'
    print(fmt.format(epoch+1, 
                          train_loss.result(),
                          train_accuracy.result(),
                          test_loss.result(),
                          test_accuracy.result()
                    )
         )
    
    train_loss.reset_states()
    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()

이 또한 학습을 다 진행시킨 후 !tensorboard --logdir logs/gradient_tape을 실행시키면 Tensorboard로 이동할 수 있는 링크가 뜬다.

아래는 Tensorboard에 이미지 데이터를 기록하는 방법Confusion Matrix를 기록하는 방법이다.

📌 이미지 데이터를 기록

logdir = "logs/train_data/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
file_writer = tf.summary.create_file_writer(logdir)

for i in np.random.randint(10000, size=10):    
    img = train_x[i:i+1]
    with file_writer.as_default():
        tf.summary.image("Training Sample data : {}".format(i), img, step=0)

!tensorboard --logdir logs/train_data

📌 Confusion Matrix를 기록

import io
from sklearn.metrics import confusion_matrix

def plot_to_image(figure):
    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    plt.close(figure)
    buf.seek(0)
    image = tf.image.decode_png(buf.getvalue(), channels=4)
    image = tf.expand_dims(image, 0)
    return image

def plot_confusion_matrix(cm, class_names):

    figure = plt.figure(figsize=(8, 8))
    plt.imshow(cm)
    plt.title("Confusion matrix")
    plt.colorbar()
    tick_marks = np.arange(len(class_names))
    threshold = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            color = "white" if cm[i, j] > threshold else "black"
            plt.text(j, i, cm[i, j], horizontalalignment="center", color=color)

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    return figure

logdir = "logs/fit/cm/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
file_writer_cm = tf.summary.create_file_writer(logdir)

test_images = test_x[:100]
test_labels = np.argmax(test_y[:100], axis=1)

def log_confusion_matrix(epoch, logs):
    test_pred_raw = model.predict(test_images)
    test_pred = np.argmax(test_pred_raw, axis=1)
    
    classes = np.arange(10)
    cm = confusion_matrix(test_labels, test_pred, labels=classes)
    
    figure = plot_confusion_matrix(cm, class_names=classes)
    cm_image = plot_to_image(figure)

    with file_writer_cm.as_default():
        tf.summary.image("Confusion Matrix", cm_image, step=epoch)
        
# callback 정의
cm_callback = tf.keras.callbacks.LambdaCallback(on_epoch_end=log_confusion_matrix)

# 학습
model.fit(x=train_x, 
          y=train_y,
          epochs=5,
          batch_size=32,
          validation_data=(test_x, test_y),
          callbacks=[tensorboard_callback, cm_callback])

# 텐서보드로 확인
!tensorboard --logdir logs/fit

윈도우에서는 /가 아닌 \로 인식해야해서 안된다는 사람, 파일을 읽고 쓰는 권한이 없어서 안된다는 사람 등등.. 구글링해보니깐 상황이 여러가지이던데 나는 왜 안되는지 모르겠다 ㅜㅠ 403에러가 떴는데,, 나중에 함 알아봐야지...!!



Model Save and Load

모델을 저장하고 불러올 수 있는 방법으로 간단히 코드만 소개하겠다.

save 함수로 저장

# 저장
model.save("checkpoints/sample/model.h5") # 원하는 경로 입력

# 불러오기
model_loaded = tf.keras.models.load_model("checkpoints/sample/model.h5")

# 확인
model_loaded.summary()

save_weights 함수로 저장

weights만 저장 하므로, 저장공간이 절약된다.

# 저장
model.save_weights("checkpoints/sample/model.h5")

# 새로운 모델에 가중치 입히기
new_model = build_resnet((32, 32, 3))
# 불러오기
new_model.load_weights("checkpoints/sample/model.h5")

# 확인
print(model.predict(test_x[:1]))
print(new_model.predict(test_x[:1]))

기존의 모델과 동일하게 예측값을 보여주는 것을 확인할 수 있다.


Callbacks 함수로 저장

# 저장
save_path = 'checkpoints/{epoch:02d}-{val_loss:.2f}.h5'
checkpoint = tf.keras.callbacks.ModelCheckpoint(save_path, 
                                                monitor='val_accuracy', 
                                                save_best_only=True)

model.fit(x=train_x, 
          y=train_y, 
          epochs=1,
          validation_data=(test_x, test_y), 
          callbacks=[checkpoint])

pb 형식으로 저장

# 저장
save_path = 'checkpoints/{epoch:02d}-{val_loss:.2f}'
checkpoint = tf.keras.callbacks.ModelCheckpoint(save_path, 
                                                monitor='val_accuracy', 
                                                save_best_only=True)

model.fit(x=train_x, 
          y=train_y, 
          epochs=1,
          validation_data=(test_x, test_y), 
          callbacks=[checkpoint])

# 불러오기
model = tf.saved_model.load("checkpoints/01-2.32")




데이터 다루기

드디어 텐서플로의 마지막 ! 데이터를 다루는 방법이다.
이 파트에서는 데이터를 어떻게 읽어들이는지에 대해 정리해보겠다.

데이터 읽기

지금은 로컬 환경에서 데이터를 읽는 작업이다.

# 특정 반복되는 형태의 파일을 쉽게 부를 수 있음
glob("../../datasets/cifar/train/*.png")

우리는 glob을 이용해 특정 반복되는 형태의 파일을 쉽게 읽을 수 있다.

위 데이터는 CIFAR 데이터인데 데이터가 엄청 많은 경우 일일이 읽어들인다면 메모리가 터질 것이다.

이때 tf.data.API를 이용하면 된다.

tf.data.API

미리 이미지 데이터를 모두 불러오는게 아니라 그 때 그 때 처리를 하는 방식으로 진행되어 속도가 더 빠르다.

train_img = glob("../../datasets/cifar/train/*.png") # 읽어들일 파일 경로

# train_img를 하나하나 받아옴
# dataset = tf.data.Dataset.from_tensor_slices(train_img)


# tf.data.API
AUTOTUNE = tf.data.experimental.AUTOTUNE
                # .map 이후의 코드를 실행시킴
dataset = dataset.map(read_img, num_parallel_calls=AUTOTUNE)
# num_parallel_calls은 병렬처리 수준을 정하는 것
# AUTOTUNE으로 해두면 자동으로 정해준다. 

# 배치 처리
dataset = dataset.batch(32)
dataset = dataset.prefetch(AUTOTUNE) 
        # 앞 코드의 32배치를 읽어들이는 와중에 병렬적으로 그 다음 배치를 읽어들이고 있음
        # 스칼라 값을 넣거나 , AUTOTUNE으로 설정시 자동으로 그 값을 설정해줌

# next(iter(dataset))

dataset = dataset.shuffle(buffer_size=10) # 내부적으로 얼마만큼 섞을 것인가

# 여러번 epoch을 돌 때
dataset = dataset.repeat() # 입력값으로 숫자를 잘 넣지는 않음
# 아무것도 설정하지 않은채로 이후 for문의 코드에 epoch 숫자를 정해줌

대부분 딥러닝에서의 전처리의 기본 세트이다.

# 대부분 딥러닝에서의 전처리의 기본 세트
AUTOTUNE = tf.data.experimental.AUTOTUNE
dataset = dataset.map(read_img, num_parallel_calls=AUTOTUNE)
dataset = dataset.batch(32)
dataset = dataset.prefetch(AUTOTUNE) 
dataset = dataset.shuffle(buffer_size=10) 
dataset = dataset.repeat() 

이미지를 읽을 때 경로에서 Label이 있었다.
이미지를 넘겨줄 때 Label까지 같이 넘겨주는 것이 좋을 것이다!


📌 Label 확인하기

label_names = tf.io.read_file("../../datasets/cifar/labels.txt").numpy().decode('ascii').strip().split("\n")

np.array('frog' == np.array(label_names)) # 클래스에 해당하는 값이 True가 됨
# 하지만 학습에 사용되는 것은 원핫인코딩된 값이므로 변환 필요

np.array('frog' == np.array(label_names), dtype = np.float32) # 우리가 원하는 값!

📌 전체 데이터셋에 대하여

# 전체 데이터셋에 대하여
label_txt = tf.io.read_file("../../datasets/cifar/labels.txt")
label_names = np.array(label_txt.numpy().decode('ascii').strip().split("\n"))

def parse_label(path):
    name = path.split("/")[-1].split(".")[0].split("_")[-1]
    return np.array(name == label_names, dtype=np.float32)

train_y = np.array([parse_label(y) for y in train_img])

# image, Label 넘겨주기
def read_data(path, label):
    img = read_img(path)
    return img, label

dataset = tf.data.Dataset.from_tensor_slices((train_img, train_y))

# 대부분 딥러닝에서의 전처리의 기본 세트
AUTOTUNE = tf.data.experimental.AUTOTUNE
dataset = dataset.map(read_data, num_parallel_calls=AUTOTUNE)
dataset = dataset.batch(32)
dataset = dataset.prefetch(AUTOTUNE) 
dataset = dataset.shuffle(buffer_size=10) 
dataset = dataset.repeat()

하지만 위 코드에서 read_data 함수에서 label을 입력값으로 받지 않고 return에서 바로 label을 넘겨주는 것이 코드가 더 효율적일 것이다.

.map 함수 뒤에는 tensor 연산이 와야하므로 이를 주의하며 label을 return에서 넘겨주는 것을 map 함수로 처리해보면 아래와 같다.


📌 조금 더 효율적인 코드

def get_label(path):
    f_name = tf.strings.split(path, '_')[-1] # 파일 이름
    lbl_name = tf.strings.regex_replace(f_name, '.png', '') # 파일 이름에서 클래스 이름 추출
	
    # 클래스 이름과 일치하는 원핫인코딩
    return tf.cast(lbl_name == label_names, tf.float32)


def load_image_label(path):
    gfile = tf.io.read_file(path)
    image = tf.io.decode_image(gfile)
    label = get_label(path)

    return image, label

dataset = tf.data.Dataset.from_tensor_slices(train_img)

# 대부분 딥러닝에서의 전처리의 기본 세트
AUTOTUNE = tf.data.experimental.AUTOTUNE
dataset = dataset.map(load_image_label, num_parallel_calls=AUTOTUNE)
dataset = dataset.batch(32)
dataset = dataset.prefetch(AUTOTUNE) 
dataset = dataset.shuffle(buffer_size=10) 
dataset = dataset.repeat() 



ImageDataGenerator

(굳이 로컬 환경이 아니더라도) 데이터를 불러오는 동시에 여러가지 전처리를 쉽게 구현할 수 있는 기능으로 tf.data.API 보다 더 편리한 기능을 소개해보겠다.

datagen = ImageDataGenerator(
    # 아래의 작업을 랜덤하게 적용 또는 적용 X
    rotation_range=20, # 20도 각도 내에서 회전
    width_shift_range=0.2, # 가로로 이동
    height_shift_range=0.2, # 세로로 이동
    horizontal_flip=True) # 가로축 반전

이게 끝이다!!ㅋㅋㅋㅋ


flow

  • 데이터를 모두 메모리에 불러두고 사용할 때 유용

train_xtrain_y를 불러온 후

result = next(iter(datagen.flow((train_x, train_y))))

x, y = result
x.shape, y.shape # ((32, 32, 32, 3), (32, 10))

위 코드를 실행시켜주면 랜덤하게 이미지가 바뀌는 것을 볼 수 있을 것이다.


flow_from_directory

  • 데이터가 너무 커서 하나씩 불러와야 할 때 유용

하지만 이 모듈을 사용하려면 조건이 있다.
위 사진과 같이

  1. 분류나 클래스가 숫자로 구분되어있는 문제에 사용 가능
  2. 클래스가 각 폴더 별로 나누어져 있고
  3. 각 폴더 아래 클래스에 해당하는 파일들이 존재하는 데이터

위 3가지 조건에 해당해야 사용 가능하다.

지금 MNIST 데이터가 그렇게 되어있으니 한 번 이용해보자.

input_shape = (28, 28, 1)
batch_size = 32

gen =  datagen.flow_from_directory(
    train_dir,
    target_size = input_shape[:2] , # 입력데이터를 어떤 사이즈로 줄여줄지
    batch_size = batch_size,
    color_mode = 'grayscale' # 채널 설정
)
# Found 60000 images belonging to 10 classes.

x , y = next(iter(gen))
x.shape, y.shape # ((32, 28, 28, 1), (32, 10))

flow_from_DataFrame


위와 같이 경로와 타겟이 데이터프레임으로 만들어져있는 경우 사용할 수 있는 모듈이다.

위 데이터프레임은 직접 만들 수도 있으므로 flow_from_directory보단 자유로우며 메모리도 더 적게 사용한다.

gen = datagen.flow_from_dataframe(
    train_data,
    x_col="path",
    y_col="class_name",
    target_size=(32, 32), # input_size
    color_mode="rgb", # 채널 수
    class_model="categorical", 
    batch_size=32
)
# Found 50000 validated image filenames belonging to 10 classes.

x, y = next(iter(gen))
x.shape, y.shape # ((32, 32, 32, 3), (32, 10))

# 모델 생성 후 
# 학습은 model.fit(gen) 으로 가능
profile
데이터 분석가(가 되고픈) 황성미입니다!

0개의 댓글