아래 이미지처럼 폐부위에 희미한 그림자?같은게 보이는데 사실 이 사진만으로 확실히 폐렴이다 아니다 판단하기 어렵습니다. 그래서 CNN모델을 이용해 구분을 해주는 모델을 만들어 보겠습니다.
import os, re
import random, math
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
# 데이터 로드할 때 빠르게 로드할 수 있도록하는 설정 변수
AUTOTUNE = tf.data.experimental.AUTOTUNE
# X-RAY 이미지 사이즈 변수
IMAGE_SIZE = [180, 180]
# 데이터 경로 변수
ROOT_PATH = os.path.join(os.getenv('HOME'), 'aiffel')
TRAIN_PATH = ROOT_PATH + '/chest_xray/data/train/*/*'
VAL_PATH = ROOT_PATH + '/chest_xray/data/val/*/*'
TEST_PATH = ROOT_PATH + '/chest_xray/data/test/*/*'
# 프로젝트를 진행할 때 아래 두 변수를 변경해보세요
BATCH_SIZE = 30
EPOCHS = 50
print(ROOT_PATH)
데이터에는
train_filenames = tf.io.gfile.glob(TRAIN_PATH)
test_filenames = tf.io.gfile.glob(TEST_PATH)
val_filenames = tf.io.gfile.glob(VAL_PATH)
print(len(train_filenames))
print(len(test_filenames))
print(len(val_filenames))
5216
624
16
val데이터가 16개로 적기 때문에 train에서 val으로 사용할 데이터 셋을 8:2분리를 하겠습니다.
# train 데이터와 validation 데이터를 모두 filenames에 담습니다
filenames = tf.io.gfile.glob(TRAIN_PATH)
filenames.extend(tf.io.gfile.glob(VAL_PATH))
# 모아진 filenames를 8:2로 나눕니다
train_size = math.floor(len(filenames)*0.8)
random.shuffle(filenames)
train_filenames = filenames[:train_size]
val_filenames = filenames[train_size:]
print(len(train_filenames))
print(len(val_filenames))
4185
1047
# 정상 이미지와 폐렴 이미지를 구분해야 하는데
# 다행히도 파일 경로에 힌트가 있어요
print(f'Normal image path\n{filenames[0]}')
print(f'Pneumonia image path\n{filenames[2000]}')
Normal image path
/aiffel/aiffel/chest_xray/data/train/NORMAL/IM-0717-0001.jpeg
Pneumonia image path
/aiffel/aiffel/chest_xray/data/train/PNEUMONIA/person803_bacteria_2710.jpeg
# 이제 파일 경로에 NORMAL이 있는지 확인하면 정상 이미지를 셀 수 있습니다.
COUNT_NORMAL = len([filename for filename in train_filenames if "NORMAL" in filename])
print(f"Normal images count in training set: {COUNT_NORMAL}")
# 파일 경로에 PNEUMONIA이 있는지 확인하면 폐렴 이미지도 셀 수 있습니다.
COUNT_PNEUMONIA = len([filename for filename in train_filenames if "PNEUMONIA" in filename])
print(f"Pneumonia images count in training set: {COUNT_PNEUMONIA}")
Normal images count in training set: 1081
Pneumonia images count in training set: 3104
train_list_ds = tf.data.Dataset.from_tensor_slices(train_filenames)
val_list_ds = tf.data.Dataset.from_tensor_slices(val_filenames)
TRAIN_IMG_COUNT = tf.data.experimental.cardinality(train_list_ds).numpy()
print(f"Training images count: {TRAIN_IMG_COUNT}")
VAL_IMG_COUNT = tf.data.experimental.cardinality(val_list_ds).numpy()
print(f"Validating images count: {VAL_IMG_COUNT}")
Training images count: 4185
Validating images count: 1047
파일 경로에 'NORMAL'이나 'PNEUMONIA'가 포함되어 있기 때문에 이를 이용해서 라벨 데이터를 만들어 주는 함수를 만들어 보겠습니다.
# 파일 경로의 끝에서 두번째 부분을 확인하면 양성과 음성을 구분할 수 있습니다
def get_label(file_path):
parts = tf.strings.split(file_path, os.path.sep)
return parts[-2] == "PNEUMONIA" # 폐렴이면 양성(True), 노말이면 음성(False)
이미지의 사이즈를 통일 시키고 GPU 메모리를 더욱 효율적으로 사용하기 위해 이미지 사이즈를 줄여봅시다.
# 이미지를 알맞은 형식으로 바꿉니다.
def decode_img(img):
img = tf.image.decode_jpeg(img, channels=3) # 이미지를 uint8 tensor로 수정
img = tf.image.convert_image_dtype(img, tf.float32) # float32 타입으로 수정
img = tf.image.resize(img, IMAGE_SIZE) # 이미지 사이즈를 IMAGE_SIZE로 수정
return img
# 이미지 파일의 경로를 입력하면 이미지와 라벨을 읽어옵니다.
def process_path(file_path):
label = get_label(file_path) # 라벨 검출
img = tf.io.read_file(file_path) # 이미지 읽기
img = decode_img(img) # 이미지를 알맞은 형식으로 수정
return img, label
train 데이터 셋과 validation 데이터 셋을 만듭니다.
train_ds = train_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
val_ds = val_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
이미지가 잘 들어갔는지 확인해보겠습니다.
for image, label in train_ds.take(1):
print("Image shape: ", image.numpy().shape)
print("Label: ", label.numpy())
Image shape: (180, 180, 3)
Label: False
위 작업은 train과 val만 했다면 test도 동일한 작업을 해줍니다!
test_list_ds = tf.data.Dataset.list_files(TEST_PATH)
TEST_IMAGE_COUNT = tf.data.experimental.cardinality(test_list_ds).numpy()
test_ds = test_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
test_ds = test_ds.batch(BATCH_SIZE)
print(TEST_IMAGE_COUNT)
624
위에서 Tensorflow에 data로 형성하고 효육적으로 사용하게 했습니다.
def augment(image,label):
image = tf.image.random_flip_left_right(image) # 랜덤하게 좌우를 반전합니다.
return image,label
def prepare_for_training(ds, shuffle_buffer_size=1000):
# augment 적용 부분이 배치처리 함수에 추가되었습니다.
ds = ds.map(
augment, # augment 함수 적용
num_parallel_calls=2
)
ds = ds.shuffle(buffer_size=shuffle_buffer_size)
ds = ds.repeat()
ds = ds.batch(BATCH_SIZE)
ds = ds.prefetch(buffer_size=AUTOTUNE)
return ds
train_ds = prepare_for_training(train_ds)
val_ds = prepare_for_training(val_ds)
데이터를 보기 위해 먼저, train에 있는 batch 중 첫 번째 배치를 추출합니다. 추출된 배치를 image와 label 데이터 셋으로 나눕니다.
# 이미지 배치를 입력하면 여러장의 이미지를 보여줍니다.
def show_batch(image_batch, label_batch):
plt.figure(figsize=(10,10))
for n in range(BATCH_SIZE):
ax = plt.subplot(5,math.ceil(BATCH_SIZE/5),n+1)
plt.imshow(image_batch[n])
if label_batch[n]:
plt.title("PNEUMONIA")
else:
plt.title("NORMAL")
plt.axis("off")
image_batch, label_batch = next(iter(train_ds))
show_batch(image_batch.numpy(), label_batch.numpy())
def conv_block(filters):
block = tf.keras.Sequential([
tf.keras.layers.SeparableConv2D(filters, 3, activation='relu', padding='same'),
tf.keras.layers.SeparableConv2D(filters, 3, activation='relu', padding='same'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.MaxPool2D()
])
return block
def dense_block(units, dropout_rate):
block = tf.keras.Sequential([
tf.keras.layers.Dense(units, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(dropout_rate)
])
return block
def build_model():
model = tf.keras.Sequential([
tf.keras.Input(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3)),
tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
tf.keras.layers.Conv2D(16, 3, activation='relu', padding='same'),
tf.keras.layers.MaxPool2D(),
conv_block(32),
conv_block(64),
conv_block(128),
tf.keras.layers.Dropout(0.2),
conv_block(256),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Flatten(),
dense_block(512, 0.7),
dense_block(128, 0.5),
dense_block(64, 0.3),
tf.keras.layers.Dense(1, activation='sigmoid')
])
return model
아까 데이터의 라벨양을 보면 폐렴의 이미지가 너무 많습니다. 그래서 각 라벨에 가중치를 주어 언벨런스한 부분을 어느정도 해결하겠습니다.
weight_for_0 = (1 / COUNT_NORMAL)*(TRAIN_IMG_COUNT)/2.0
weight_for_1 = (1 / COUNT_PNEUMONIA)*(TRAIN_IMG_COUNT)/2.0
class_weight = {0: weight_for_0, 1: weight_for_1}
print('Weight for NORMAL: {:.2f}'.format(weight_for_0))
print('Weight for PNEUMONIA: {:.2f}'.format(weight_for_1))
Weight for NORMAL: 1.94
Weight for PNEUMONIA: 0.67
with tf.device('/GPU:0'):
model = build_model()
METRICS = [
'accuracy',
tf.keras.metrics.Precision(name='precision'),
tf.keras.metrics.Recall(name='recall')
]
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=METRICS
)
from keras.callbacks import EarlyStopping
# 최고의 정확도를 가질때 멈춰주는 함수 추가
es=EarlyStopping(monitor='val_loss',mode='min',verbose=1,patience=10)
with tf.device('/GPU:0'):
history = model.fit(
train_ds,
steps_per_epoch=TRAIN_IMG_COUNT // BATCH_SIZE,
epochs=EPOCHS,callbacks=[es],
validation_data=val_ds,
validation_steps=VAL_IMG_COUNT // BATCH_SIZE,
class_weight=class_weight,
)
위 모델 결과를 바탕으로 그래프로 Epochs마다 모델의 precision, recall, accuracy, loss의 변화를 보겠습니다.
fig, ax = plt.subplots(1, 4, figsize=(20, 3))
ax = ax.ravel()
for i, met in enumerate(['precision', 'recall', 'accuracy', 'loss']):
ax[i].plot(history.history[met])
ax[i].plot(history.history['val_' + met])
ax[i].set_title('Model {}'.format(met))
ax[i].set_xlabel('epochs')
ax[i].set_ylabel(met)
ax[i].legend(['train', 'val'])
test데이터로 모델을 평가해봅니다.
loss, accuracy, precision, recall = model.evaluate(test_ds)
print(f'Loss: {loss},\nAccuracy: {accuracy},\nPrecision: {precision},\nRecall: {recall}')
위 결과는 recall값이 매우 작네요! 의료데이터는 실제 폐렴인 환자를 정상으로 판단하면 매우 위험하기 때문에 recall값을 올려야합니다. recall값이 높아 질 수 있도록 다시 학습을 해보겠습니다.
위에서 의료데이터를 좌우반전도 해서 학습시키는 작업을 했습니다. 하지만 X ray사진을 찍을때 항상 같은 조건으로 촬영을 합니다. 그래서 좌우반전 작업을 삭제하고 다시해보겠습니다.
test_list_ds = tf.data.Dataset.list_files(TEST_PATH)
TEST_IMAGE_COUNT = tf.data.experimental.cardinality(test_list_ds).numpy()
test_ds = test_list_ds.map(process_path, num_parallel_calls=AUTOTUNE)
test_ds = test_ds.batch(BATCH_SIZE)
좌우반전 함수 삭제
def prepare_for_training(ds, shuffle_buffer_size=1000):
ds = ds.shuffle(buffer_size=shuffle_buffer_size)
ds = ds.repeat()
ds = ds.batch(BATCH_SIZE)
ds = ds.prefetch(buffer_size=AUTOTUNE)
return ds
train_ds = prepare_for_training(train_ds)
val_ds = prepare_for_training(val_ds)
conv2D레이어 256 --> 128 -->64-->128-->256 등의 형태로 변경
def build_model():
model = tf.keras.Sequential([
tf.keras.Input(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1], 3)),
tf.keras.layers.Conv2D(128, 3, activation='relu', padding='same'),
tf.keras.layers.Conv2D(64, 3, activation='relu', padding='same'),
tf.keras.layers.MaxPool2D(),
conv_block(64),
conv_block(64),
conv_block(128),
tf.keras.layers.Dropout(0.2),
conv_block(256),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Flatten(),
dense_block(512, 0.7),
dense_block(128, 0.5),
dense_block(64, 0.3),
tf.keras.layers.Dense(1, activation='sigmoid')
])
return model
with tf.device('/GPU:0'):
model = build_model()
METRICS = [
'accuracy',
tf.keras.metrics.Precision(name='precision'),
tf.keras.metrics.Recall(name='recall')
]
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=METRICS
)
BATCH_SIZE 50 -->16으로 변경
BATCH_SIZE = 30
EPOCHS = 50
earlystopping은 val precision을 기준으로 멈추겠습니다!
from keras.callbacks import EarlyStopping
# 최고의 정확도를 가질때 멈춰주는 함수 추가
es=EarlyStopping(monitor='val_precision',mode='max',verbose=1,patience=5)
with tf.device('/GPU:0'):
history = model.fit(
train_ds,
steps_per_epoch=TRAIN_IMG_COUNT // BATCH_SIZE,
epochs=EPOCHS,callbacks=[es],
validation_data=val_ds,
validation_steps=VAL_IMG_COUNT // BATCH_SIZE,
class_weight=class_weight,
)
loss, accuracy, precision, recall = model.evaluate(test_ds)
print(f'Loss: {loss},\nAccuracy: {accuracy},\nPrecision: {precision},\nRecall: {recall}')
아까 위의 21에폭이나 하고 Recall: 0.8897435665130615 보다는 약간 변형을 준 바로 위 모델이 recall값이 더 좋은 결과를 가지고 왔습니다. 사실 recall을 99까지 올릴수 있지만 그렇게 하면 다른 precision이나 accuracy값이 매우 낮아서... 약간 포기해봤습니다.
BATCH_SIZE = 16
EPOCHS = 80
from keras.callbacks import EarlyStopping
# 최고의 정확도를 가질때 멈춰주는 함수 추가
es=EarlyStopping(monitor='val_loss',mode='min',verbose=1,patience=20)
with tf.device('/GPU:0'):
history = model.fit(
train_ds,
steps_per_epoch=TRAIN_IMG_COUNT // BATCH_SIZE,
epochs=EPOCHS,callbacks=[es],
validation_data=val_ds,
validation_steps=VAL_IMG_COUNT // BATCH_SIZE,
class_weight=class_weight,
)
loss, accuracy, precision, recall = model.evaluate(test_ds)
print(f'Loss: {loss},\nAccuracy: {accuracy},\nPrecision: {precision},\nRecall: {recall}')
early stop을 val_loss로 설정하는게 고루고루 좋은 값을 보이는 것 같습니다. 의료데이터는 recall값을 중요시 하기때문에 다른건 무시하고 recall값을 100%에 가깝게 하겠다! 하면 early stop을 recall로 설정해서 모델을 구축하는게 더 좋은 모델일 수도 있겠네요!! 인공지능 특성상 100% 정확도를 가지는 모델을 만들기 힘들다보니 하나를 얻으려면 다른것을 잃어야하는건 어쩔수없나봅니다 ㅜㅜ 튼 그렇습니다! 여러 하이퍼파라미터를 바꿔보면서 해본 실험이였습니다.!