Image segmentation
을 진행해보자.Image Segmentation
에서 많이 쓰이는 평가 기준이다.!pip install tensorflow-addons
Requirement already satisfied: tensorflow-addons in /usr/local/lib/python3.10/dist-packages (0.23.0)
Requirement already satisfied: packaging in /usr/local/lib/python3.10/dist-packages (from tensorflow-addons) (23.2)
Requirement already satisfied: typeguard<3.0.0,>=2.7 in /usr/local/lib/python3.10/dist-packages (from tensorflow-addons) (2.13.3)
use_colab = True
assert use_colab in [True, False]
from google.colab import drive
drive.mount('/content/drive')
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
from __future__ import absolute_import, division
from __future__ import print_function, unicode_literals
import os
import time
import shutil
import functools
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import matplotlib.image as mpimg
import pandas as pd
from PIL import Image
from IPython.display import clear_output
import tensorflow as tf
import tensorflow_addons as tfa
print(tf.__version__)
from tensorflow.python.keras import layers
from tensorflow.python.keras import losses
from tensorflow.python.keras import models
# 2.15.0
is_train = True
model_name = 'ed_model'
assert model_name in ['ed_model', 'u-net']
이 프로젝트는 Giana Dataset을 이용하여 진행한다.
if use_colab:
DATASET_PATH='/content/drive/MyDrive/dataset/segmentation'
else:
DATASET_PATH='./'
dataset_dir = os.path.join(DATASET_PATH) # dataset이 있는 경로
img_dir = os.path.join(dataset_dir, "train") # ./dataset , train => ./dataset/train
label_dir = os.path.join(dataset_dir, "train_labels")
x_train_filenames = [os.path.join(img_dir, filename) for filename in os.listdir(img_dir)] # 한줄에서 어떤 함수를 동작시키는 방법
x_train_filenames.sort()
y_train_filenames = [os.path.join(label_dir, filename) for filename in os.listdir(label_dir)]
y_train_filenames.sort()
x_train_filenames, x_test_filenames, y_train_filenames, y_test_filenames = \
train_test_split(x_train_filenames, y_train_filenames, test_size=0.2)
num_train_examples = len(x_train_filenames)
num_test_examples = len(x_test_filenames)
print("Number of training examples: {}".format(num_train_examples))
print("Number of test examples: {}".format(num_test_examples))
#Number of training examples: 240
#Number of test examples: 60
x_train_filenames[0]
#/content/drive/MyDrive/dataset/segmentation/train/127.bmp
데이터 셋에서 5장 (display_num)의 이미지를 살펴보자.
display_num = 5
r_choices = np.random.choice(num_train_examples, display_num)
plt.figure(figsize=(10, 15))
for i in range(0, display_num * 2, 2):
img_num = r_choices[i // 2]
x_pathname = x_train_filenames[img_num]
y_pathname = y_train_filenames[img_num]
plt.subplot(display_num, 2, i + 1)
plt.imshow(Image.open(x_pathname))
plt.title("Original Image")
example_labels = Image.open(y_pathname)
label_vals = np.unique(example_labels)
plt.subplot(display_num, 2, i + 2)
plt.imshow(example_labels)
plt.title("Masked Image")
plt.suptitle("Examples of Images and their Masks")
plt.show()
# Set hyperparameters
image_size = 256
img_shape = (image_size, image_size, 3)
batch_size = 32
max_epochs = 200
if use_colab:
checkpoint_dir ='./drive/My Drive/train_ckpt/segmentation/exp1'
if not os.path.isdir(checkpoint_dir):
os.makedirs(checkpoint_dir)
else:
checkpoint_dir = './train_ckpt/segmentation/exp1'
hue_delta-RGB
이미지의 색조를 랜덤 팩터로 조정합니다. 이것은 실제 이미지에만 적용됩니다 (라벨 이미지가 아님). hue_delta는[0, 0.5]간격에 있어야합니다.horizontal_flip-0.5
확률로 중심 축을 따라 이미지를 수평으로 뒤집습니다. 이 변환은 레이블과 실제 이미지 모두에 적용해야합니다.width_shift_range 및height_shift_range
는 이미지를 가로 또는 세로로 임의로 변환하는 범위 (전체 너비 또는 높이의 일부)입니다. 이 변환은 레이블과 실제 이미지 모두에 적용해야합니다.Data augmentation은 딥러닝을 이용한 이미지 처리분야 (classification, detection, segmentation 등) 에서 널리 쓰이는 테크닉이다.
데이터 증가는 여러 무작위 변환을 통해 데이터를 증가시켜 훈련 데이터의 양을 "증가"시킵니다. 훈련 시간 동안 우리 모델은 똑같은 그림을 두 번 볼 수 없습니다. Overfitting을 방지하고 모델이 처음보는 데이터에 대해 더 잘 일반화되도록 도와줍니다.
Processing each pathname
def _process_pathnames(fname, label_path):
# We map this function onto each pathname pair
img_str = tf.io.read_file(fname)
img = tf.image.decode_bmp(img_str, channels=3) # RGB
label_img_str = tf.io.read_file(label_path)
label_img = tf.image.decode_bmp(label_img_str, channels=0) # BMP 0,1
resize = [image_size, image_size]
img = tf.image.resize(img, resize)
label_img = tf.image.resize(label_img, resize)
# 0 ~ 255 RGB 값 ch
scale = 1 / 255.
img = tf.cast(img, dtype=tf.float32) * scale
label_img = tf.cast(label_img, dtype=tf.float32) * scale
# 0 기준 초기화 -0.1 ~ 0 ~ 0.3 => 모델 가중치 초기화 범위
return img, label_img
def shift_img(output_img, label_img, width_shift_range, height_shift_range):
"""This fn will perform the horizontal or vertical shift"""
if width_shift_range or height_shift_range:
if width_shift_range:
width_shift_range = tf.random.uniform([],
-width_shift_range * img_shape[1],
width_shift_range * img_shape[1])
if height_shift_range:
height_shift_range = tf.random.uniform([],
-height_shift_range * img_shape[0],
height_shift_range * img_shape[0])
output_img = tfa.image.translate(output_img,
[width_shift_range, height_shift_range])
label_img = tfa.image.translate(label_img,
[width_shift_range, height_shift_range])
return output_img, label_img
def flip_img(horizontal_flip, vertically_flip, tr_img, label_img):
if horizontal_flip:
flip_prob = tf.random.uniform([], 0.0, 1.0)
tr_img, label_img = tf.cond(tf.less(flip_prob, 0.5),
lambda: (tf.image.flip_left_right(tr_img), tf.image.flip_left_right(label_img)),
lambda: (tr_img, label_img))
if vertically_flip:
flip_prob = tf.random.uniform([], 0.0, 1.0)
tr_img, label_img = tf.cond(tf.less(flip_prob, 0.5),
lambda: (tf.image.flip_up_down(tr_img), tf.image.flip_up_down(label_img)),
lambda: (tr_img, label_img))
return tr_img, label_img
def _augment(img,
label_img,
resize=None, # Resize the image to some size e.g. [256, 256]
scale=1, # Scale image e.g. 1 / 255.
hue_delta=0.01, # Adjust the hue of an RGB image by random factor
horizontal_flip=True, # Random left right flip,
vertically_flip=True, # Random up down flip,
width_shift_range=0.1, # Randomly translate the image horizontally
height_shift_range=0.1): # Randomly translate the image vertically
if resize is not None:
# Resize both images
label_img = tf.image.resize(label_img, resize)
img = tf.image.resize(img, resize)
if hue_delta:
img = tf.image.random_hue(img, hue_delta)
img, label_img = flip_img(horizontal_flip, vertically_flip, img, label_img)
img, label_img = shift_img(img, label_img, width_shift_range, height_shift_range)
label_img = tf.cast(label_img, dtype=tf.float32) * scale
img = tf.cast(img, dtype=tf.float32) * scale
return img, label_img
def get_baseline_dataset(filenames,
labels,
preproc_fn=functools.partial(_augment),
threads=2,
batch_size=batch_size,
is_train=True):
num_x = len(filenames)
# Create a dataset from the filenames and labels
dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
# Map our preprocessing function to every element in our dataset, taking
# advantage of multithreading
dataset = dataset.map(_process_pathnames, num_parallel_calls=threads)
if is_train:
#if preproc_fn.keywords is not None and 'resize' not in preproc_fn.keywords:
# assert batch_size == 1, "Batching images must be of the same size"
dataset = dataset.map(preproc_fn, num_parallel_calls=threads)
dataset = dataset.shuffle(num_x * 10)
dataset = dataset.batch(batch_size)
return dataset
Train dataset에서만 Data augmentation을 진행하게 설정 후 구현한다.
train_dataset = get_baseline_dataset(# TODO, # 학습 데이터
# TODO) # 정답 데이터
test_dataset = get_baseline_dataset(# TODO,
# TODO,
is_train=False)
print(train_dataset)
print(test_dataset)
#<_BatchDataset element_spec=(TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, None, None, None), dtype=tf.float32, name=None))>
#<_BatchDataset element_spec=(TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 256, 256, None), dtype=tf.float32, name=None))>
# Colab은 프로세서가 느려서 전처리에 시간이 꽤 걸립니다. 조금 기다려주세요!
for images, labels in train_dataset.take(1):
# Running next element in our graph will produce a batch of images
plt.figure(figsize=(10, 10))
img = images[0]
plt.subplot(1, 2, 1)
plt.imshow(img)
plt.subplot(1, 2, 2)
plt.imshow(labels[0, :, :, 0])
plt.show()
해당 프로젝트는 두 개의 네트워크를 만들어보는 것이 목표이다.
Encoder-Decoder 스타일의 네트워크
Encoder를 이용해 우리가 가진 Train data를 작은 차원의 공간에 압축하는 방식으로 동작한다.
Decoder는 Encoder가 압축한 데이터들을 우리가 원하는 label데이터와 같도록 재생성하는 방식으로 학습하게 된다.
if model_name == 'ed_model':
encoder = tf.keras.Sequential(name='encoder')
# inputs: [batch_size, 256, 256, 3]
# conv-batchnorm-activation-maxpool
encoder.add(layers.Conv2D(32, (3, 3), padding='same'))
encoder.add(layers.BatchNormalization())
encoder.add(layers.Activation('relu'))
encoder.add(layers.Conv2D(32, (3, 3), strides=2, padding='same'))
encoder.add(layers.BatchNormalization())
encoder.add(layers.Activation('relu')) # conv1: [batch_size, 128, 128, 32]
encoder.add(layers.Conv2D(64, (3, 3), padding='same'))
encoder.add(layers.BatchNormalization())
encoder.add(layers.Activation('relu'))
encoder.add(layers.Conv2D(64, (3, 3), strides=2, padding='same'))
encoder.add(layers.BatchNormalization())
encoder.add(layers.Activation('relu')) # conv2: [batch_size, 64, 64, 64]
encoder.add(layers.Conv2D(128, (3, 3), padding='same'))
encoder.add(layers.BatchNormalization())
encoder.add(layers.Activation('relu'))
encoder.add(layers.Conv2D(128, (3, 3), strides=2, padding='same'))
encoder.add(layers.BatchNormalization())
encoder.add(layers.Activation('relu')) # conv3: [batch_size, 32, 32, 128]
encoder.add(layers.Conv2D(256, (3, 3), padding='same'))
encoder.add(layers.BatchNormalization())
encoder.add(layers.Activation('relu'))
encoder.add(layers.Conv2D(256, (3, 3), strides=2, padding='same'))
encoder.add(layers.BatchNormalization())
encoder.add(layers.Activation('relu')) # conv4-outputs: [batch_size, 16, 16, 256]
# encoder 제대로 만들어졌는지 확인
if model_name == 'ed_model':
bottleneck = encoder(tf.random.normal([batch_size, 256, 256, 3]))
print(bottleneck.shape)
if model_name == 'ed_model':
decoder = tf.keras.Sequential(name='decoder')
# inputs: [batch_size, 16, 16, 256]
decoder.add(layers.Conv2DTranspose(128, (3, 3), strides=2, padding='same'))
decoder.add(layers.BatchNormalization())
decoder.add(layers.Activation('relu')) # conv_transpose1: [batch_size, 32, 32, 128]
decoder.add(layers.Conv2D(128, (3, 3), padding='same'))
decoder.add(layers.BatchNormalization())
decoder.add(layers.Activation('relu'))
decoder.add(layers.Conv2DTranspose(64, (3, 3), strides=2, padding='same'))
decoder.add(layers.BatchNormalization())
decoder.add(layers.Activation('relu')) # conv_transpose2: [batch_size, 64, 64, 64]
decoder.add(layers.Conv2D(64, (3, 3), padding='same'))
decoder.add(layers.BatchNormalization())
decoder.add(layers.Activation('relu'))
decoder.add(layers.Conv2DTranspose(32, (3, 3), strides=2, padding='same'))
decoder.add(layers.BatchNormalization())
decoder.add(layers.Activation('relu')) # conv_transpose3: [batch_size, 128, 128, 32]
decoder.add(layers.Conv2D(32, (3, 3), padding='same'))
decoder.add(layers.BatchNormalization())
decoder.add(layers.Activation('relu'))
decoder.add(layers.Conv2DTranspose(16, (3, 3), strides=2, padding='same'))
decoder.add(layers.BatchNormalization())
decoder.add(layers.Activation('relu')) # conv transpose4 [batch_size, 256, 256, 16]
decoder.add(layers.Conv2D(16, (3, 3), padding='same'))
decoder.add(layers.BatchNormalization())
decoder.add(layers.Activation('relu')) #
### output layer
decoder.add(layers.Conv2D(1, 1, strides=1, padding='same', activation='sigmoid'))
# decoder.add(layers.Conv2DTranspose(1, (1,1), strides=1, padding='same', activation='sigmoid')) # [256, 256, 1]
# decoder 제대로 만들어졌는지 확인
if model_name == 'ed_model':
predictions = decoder(bottleneck)
print(predictions.shape)
if model_name == 'ed_model': # 최종 모델 구성
ed_model = tf.keras.Sequential()
ed_model.add(encoder)
ed_model.add(decoder)
U-Net
Q. U-Net 구조의 특징은 무엇인가요?
FCN 구조를 가지며, skip-connection을 이용해 더 효율적인 학습을 구현할 수 있다.
The tf.keras
Functional API - Model subclassing
U-Net은 Encoder-Decoder 구조와는 달리 해당 레이어의 outputs이 바로 다음 레이어의 inputs이 되지 않는다. 이럴때는 tf.keras.Sequential()
을 쓸 수가 없다.
Sequential 구조가 아닌 네트워크를 만들 때 쓸 수 있는 API 가 바로 tf.keras functional API 이다.
Model subclassing 방식은 tf.keras.Model
클래스를 상속하여 구현한다.
if model_name == 'u-net':
# class의 구조를 가지고 있다.
class Conv(tf.keras.Model):
# 재료를 준비하는 부분으로 봐주시면 됩니다.
# self. 가 붙은게 클래스 내에서 관리하는 변수 및 함수를 의미합니다.
def __init__(self, num_filters, kernel_size):
# 학습되는 가중치를 가진 레이어는 반드시 iniat안에 있어야합니다.
super(Conv, self).__init__()
# init에 선언되는 layers는 학습가능한 파라미터가 있는 layers입니다.
self.conv1 = layers.Conv2D(num_filters, kernel_size)# TODO
self.bn = # TODO
self.relu = # TODO
# 실제 모델을 구성하는 부분입니다.
# __init__에 레이어를 구성할 재료를 모은 후 call에서 조립 (연결)
def call(self, inputs, training=True): # __call__
x = # layers.Conv2D
x = # layers.BatchNorm
x = # ReLU
return x
# class가 생성될때
# conv = Conv(num_filters=16, kernel_size=3) # Conv class의 __init__함수가 동작!
# conv(inputs) # Call => Conv-bn-relu
if model_name == 'u-net':
class ConvBlock(tf.keras.Model):
def __init__(self, num_filters):
super(ConvBlock, self).__init__()
self.conv1 = Conv(num_filters, 3) # TODO # conv-bn-relu
self.conv2 = # TODO # conv-bn-relu
def call(self, inputs, training=True):
conv_block = # TODO
conv_block = # TODO
return conv_block
class EncoderBlock(tf.keras.Model):
def __init__(self, num_filters):
super(EncoderBlock, self).__init__()
self.conv_block = # TODO
self.encoder_pool = # TODO
def call(self, inputs, training=True):
encoder = # TODO
encoder_pool = # TODO
return encoder_pool, encoder
class DecoderBlock(tf.keras.Model):
def __init__(self, num_filters):
super(DecoderBlock, self).__init__()
self.convT = # TODO
self.bn = # TODO
self.relu = # TODO
self.conv_block = # TODO
def call(self, input_tensor, concat_tensor, training=True): # convT - bn - relu
decoder = # ConvT
decoder = # bn
decoder = # relu
decoder = # concat
decoder = # conv_block
return decoder
if model_name == 'u-net':
class UNet(tf.keras.Model):
def __init__(self):
super(UNet, self).__init__()
self.encoder_block1 = # 32
self.encoder_block2 = # 64
self.encoder_block3 = # 128
self.encoder_block4 = # 256
self.center = # 512 conv block
self.decoder_block4 = # 256
self.decoder_block3 = # 128
self.decoder_block2 = # 64
self.decoder_block1 = # 32
self.output_conv = # 1
def call(self, inputs, training=True):
encoder1_pool, encoder1 = # output 2개
encoder2_pool, encoder2 = # output 2개
encoder3_pool, encoder3 = # output 2개
encoder4_pool, encoder4 = # output 2개
center = #
decoder4 = # input 2개
decoder3 = # input 2개
decoder2 = # input 2개
decoder1 = # input 2개
outputs = #
return outputs
if model_name == 'u-net':
unet_model = UNet()
우리가 사용할 loss function은 다음과 같다.
binary cross entropy
dice_loss
Image Segmentation Task에서 정답을 더 잘 찾아내기위해 새로운 Loss를 추가해 사용한다.
Dice loss를 추가하는 이유는 segmentation task를 더 잘 수행하기위해서이다.
Cross-entropy loss와 Dice loss를 같이 사용해 meanIoU를 더 올리도록 학습할 수 있다.
def dice_coeff(y_true, y_pred):
smooth = 1e-10
# Flatten
y_true_f = tf.reshape(y_true, [-1])
y_pred_f = tf.reshape(y_pred, [-1])
intersection = tf.reduce_sum(y_true_f * y_pred_f)
score = (2. * intersection + smooth) / (tf.reduce_sum(tf.square(y_true_f)) + \
tf.reduce_sum(tf.square(y_pred_f)) + smooth)
return score
def dice_loss(y_true, y_pred):
loss = 1 - dice_coeff(y_true, y_pred)
return loss
def bce_dice_loss(y_true, y_pred):
loss = tf.reduce_mean(losses.binary_crossentropy(y_true, y_pred)) + \
dice_loss(y_true, y_pred)
return loss
optimizer = tf.keras.optimizers.Adam(1e-4)
if model_name == 'ed_model':
print('select the Encoder-Decoder model')
model = ed_model
if model_name == 'u-net':
print('select the U-Net model')
model = unet_model
select the U-Net model
if not tf.io.gfile.exists(checkpoint_dir):
tf.io.gfile.makedirs(checkpoint_dir)
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
if is_train:
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
model=model)
else:
checkpoint = tf.train.Checkpoint(model=model)
## Define print function
def print_images():
for test_images, test_labels in test_dataset.take(1):
predictions = model(test_images, training=False)
plt.figure(figsize=(10, 20))
plt.subplot(1, 3, 1)
plt.imshow(test_images[0,: , :, :])
plt.title("Input image")
plt.subplot(1, 3, 2)
plt.imshow(test_labels[0, :, :, 0])
plt.title("Actual Mask")
plt.subplot(1, 3, 3)
plt.imshow(predictions[0, :, :, 0])
plt.title("Predicted Mask")
plt.show()
# save loss values for plot
loss_history = []
global_step = 0 # step 수 정의 (선택)
print_steps = 10 # tf.gradient_tape
save_epochs = 1 # tf.gradient_tape
for epoch in range(max_epochs):
for images, labels in train_dataset: # 데이터 로드 파트
start_time = time.time()
global_step = global_step + 1
with tf.GradientTape() as tape: # 모델 학습 파트
predictions = model(images, training=True) # [batch_size, 256,256,3]
# [batch_size, 256, 256, 1] - prediction
loss = bce_dice_loss(labels, predictions) # label [batch_size, 256, 256, 1]
# 가중치 업데이트 파트
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# 학습 상태 출력
epochs = global_step * batch_size / float(num_train_examples)
duration = time.time() - start_time
if global_step % print_steps == 0:
clear_output(wait=True)
examples_per_sec = batch_size / float(duration)
print("Epochs: {:.2f} global_step: {} loss: {:.3f} ({:.2f} examples/sec; {:.3f} sec/batch)".format(
epochs, global_step, loss, examples_per_sec, duration))
loss_history.append([epoch, loss])
# print sample image
for test_images, test_labels in test_dataset.take(1):
predictions = model(test_images, training=False)
plt.figure(figsize=(10, 20))
plt.subplot(1, 3, 1)
plt.imshow(test_images[0,: , :, :])
plt.title("Input image")
plt.subplot(1, 3, 2)
plt.imshow(test_labels[0, :, :, 0])
plt.title("Actual Mask")
plt.subplot(1, 3, 3)
plt.imshow(predictions[0, :, :, 0])
plt.title("Predicted Mask")
plt.show()
# saving (checkpoint) the model periodically
if (epoch+1) % save_epochs == 0:
checkpoint.save(file_prefix = checkpoint_prefix)
loss_history = np.asarray(loss_history)
plt.figure(figsize=(4, 4))
plt.plot(loss_history[:,0], loss_history[:,1])
plt.show()
def mean_iou(y_true, y_pred, num_classes=2):
# Flatten
y_true_f = tf.reshape(y_true, [-1])
y_pred_f = tf.reshape(y_pred, [-1])
y_true_f = tf.cast(tf.round(y_true_f), dtype=tf.int32).numpy()
y_pred_f = tf.cast(tf.round(y_pred_f), dtype=tf.int32).numpy()
# calculate confusion matrix
labels = list(range(num_classes))
current = confusion_matrix(y_true_f, y_pred_f, labels=labels)
# compute mean iou
intersection = np.diag(current)
ground_truth_set = current.sum(axis=1)
predicted_set = current.sum(axis=0)
union = ground_truth_set + predicted_set - intersection
IoU = intersection / union.astype(np.float32)
return np.mean(IoU)
tf.train.latest_checkpoint(checkpoint_dir)
#mean = tf.keras.metrics.Mean("mean_iou")
mean = []
for images, labels in test_dataset:
predictions = model(images, training=False)
m = mean_iou(labels, predictions)
mean.append(m)
mean = np.array(mean)
mean = np.mean(mean)
print("mean_iou: {}".format(mean))
#print("mean_iou: {}".format(mean.result().numpy()))
# mean_iou: 0.8776053163888694
## Define print function
def print_images():
for test_images, test_labels in test_dataset.take(1):
predictions = model(test_images, training=False)
for i in range(batch_size):
plt.figure(figsize=(10, 20))
plt.subplot(1, 3, 1)
plt.imshow(test_images[i,: , :, :])
plt.title("Input image")
plt.subplot(1, 3, 2)
plt.imshow(test_labels[i, :, :, 0])
plt.title("Actual Mask")
plt.subplot(1, 3, 3)
plt.imshow(predictions[i, :, :, 0])
plt.title("Predicted Mask")
plt.show()
print_images()