[CNN-LSTM 3] input data가 이미지인 CNN-LSTM 모델 - video classification 중심으로2

SeomIII·2022년 4월 27일
0

SONSU

목록 보기
9/29
post-custom-banner

📝 발표 이후 5월이 시작되기 전까지 꼭 cnn-lstm 모델을 한번 돌려보는 것이 목표이다!

📝 잠시 멈춰있던 vgg16, inception v3 코드비교를 통해 inception v3을 활용한 cnn-lstm 모델 구현을 다시 시작한다.

📌Inception V3

!wget -q https://git.io/JGc31 -O ucf101_top5.tar.gz
!tar xf ucf101_top5.tar.gz
  • 데이터 - UCF101데이터 셋의 서브샘플링된 버전을 사용한다.
  • 우리는 우리의 데이터 셋을 이용할 것이기 때문에 필요없다.
from tensorflow_docs.vis import embed
from tensorflow import keras
from imutils import paths

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import os
  • 필요한 라이브러리 import
IMG_SIZE = 224
BATCH_SIZE = 64
EPOCHS = 10

MAX_SEQ_LENGTH = 20
NUM_FEATURES = 2048
  • 매개변수 정의
train_df = pd.read_csv("train.csv")
test_df = pd.read_csv("test.csv")

print(f"Total videos for training: {len(train_df)}")
print(f"Total videos for testing: {len(test_df)}")

train_df.sample(10)
  • video의 제목과 라벨링이 되어있는 csv 파일로 추정된다. 총 traing,test에 사용될 영상의 개수를 파악하기 위한 코드이다.
    train_df.sample(10) 을 수행하면 10개의 샘플을 출력해준다. (영상의 이름과 라벨이 정리되어있는 표로)
  • 이 부분은 우리의 코드에 필요없다.
def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]


def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)
  • crop_center_square (frame)
  • load_video(path,max_frames=0,resize=(IMG_SIZE,IMG_SIZE))
    : 영상 데이터를 가져와서 프레임을 추출하는 과정으로 보인다.
def build_feature_extractor():
    feature_extractor = keras.applications.InceptionV3(
        weights="imagenet",
        include_top=False,
        pooling="avg",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.inception_v3.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


feature_extractor = build_feature_extractor()
  • buid_feature_extractor()
    : Inception V3모델을 사용
label_processor = keras.layers.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train_df["tag"])
)
print(label_processor.get_vocabulary())
  • StringLookup을 이용하여 문자열 라벨을 정수형식으로 변환한다./ 계층 레이블을 정수로 인코딩한다.
def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = df["video_name"].values.tolist()
    labels = df["tag"].values
    labels = label_processor(labels[..., None]).numpy()

    # `frame_masks` and `frame_features` are what we will feed to our sequence model.
    # `frame_masks` will contain a bunch of booleans denoting if a timestep is
    # masked with padding or not.
    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    # For each video.
    for idx, path in enumerate(video_paths):
        # Gather all its frames and add a batch dimension.
        frames = load_video(os.path.join(root_dir, path))
        frames = frames[None, ...]

        # Initialize placeholders to store the masks and features of the current video.
        temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
        temp_frame_features = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
        )

        # Extract features from the frames of the current video.
        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                temp_frame_features[i, j, :] = feature_extractor.predict(
                    batch[None, j, :]
                )
            temp_frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

        frame_features[idx,] = temp_frame_features.squeeze()
        frame_masks[idx,] = temp_frame_mask.squeeze()

    return (frame_features, frame_masks), labels


train_data, train_labels = prepare_all_videos(train_df, "train")
test_data, test_labels = prepare_all_videos(test_df, "test")

print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")
  • 영상데이터에서 데이터 셋을 생성하고, inception v3를 거친다.
  • num_samples=len(df) -> df는 csv 파일로, 데이터 셋의 개수를 말하는 것으로 추정됨.
🥲제일 난관이 될 부분으로 예측중,,

def get_sequence_model():
    class_vocab = label_processor.get_vocabulary()

    frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

    # Refer to the following tutorial to understand the significance of using `mask`:
    # https://keras.io/api/layers/recurrent_layers/gru/
    x = keras.layers.GRU(16, return_sequences=True)(
        frame_features_input, mask=mask_input
    )
    x = keras.layers.GRU(8)(x)
    x = keras.layers.Dropout(0.4)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)
    # len(class_vocab) 은 라벨의 숫자를 말하는 것 같으니, 분류되는 레이블 값을 넣어도 될것같다고 판단. (ex. 3개 단어 분류 = 3 으로 입력)

    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )
    return rnn_model


# Utility for running experiments.
def run_experiment():
    filepath = "/tmp/video_classifier"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    seq_model = get_sequence_model()
    history = seq_model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        validation_split=0.3,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    seq_model.load_weights(filepath)
    _, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return history, seq_model


_, sequence_model = run_experiment()
  • get_sequence_model()
    : rnn 모델 구축
    : class_vocab = label_processor.get_vocabulary() -> 라벨을 가져옴

    len(class_vocab) 은 라벨의 숫자를 말하는 것 같으니, 분류되는 레이블 값을 넣어도 될것같다고 판단. (ex. 3개 단어 분류 = 3 으로 입력)

  • run_experiment()
    : 모델 학습
def prepare_single_video(frames):
    frames = frames[None, ...]
    frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
        frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

    return frame_features, frame_mask


def sequence_prediction(path):
    class_vocab = label_processor.get_vocabulary()

    frames = load_video(os.path.join("test", path))
    frame_features, frame_mask = prepare_single_video(frames)
    probabilities = sequence_model.predict([frame_features, frame_mask])[0]

    for i in np.argsort(probabilities)[::-1]:
        print(f"  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
    return frames


# This utility is for visualization.
# Referenced from:
# https://www.tensorflow.org/hub/tutorials/action_recognition_with_tf_hub
def to_gif(images):
    converted_images = images.astype(np.uint8)
    imageio.mimsave("animation.gif", converted_images, fps=10)
    return embed.embed_file("animation.gif")


test_video = np.random.choice(test_df["video_name"].values.tolist())
print(f"Test video path: {test_video}")
test_frames = sequence_prediction(test_video)
to_gif(test_frames[:MAX_SEQ_LENGTH])
  • prepare_single_video(frames)
    : 새로 들어온 영상을 데이터 셋화 시키고, inception v3를 거치는 함수
  • sequence_prediction(path)
    : 실제로 영상을 넣고, 결과를 보는 함수
  • to_gif(images)
    : gif 형태로 바꾸기

📌 VGG16

  • 데이터 셋 구성
    -> 함수 이름 : bring_data_from_directory()
    -> ImageDataGenerator 사용
    -> train_generator, validation_generator
def bring_data_from_directory():
  datagen = ImageDataGenerator(rescale=1. / 255)
  train_generator = datagen.flow_from_directory(
          'train',
          target_size=(224, 224),
          batch_size=batch_size,
          class_mode='categorical',  # this means our generator will only yield batches of data, no labels
          shuffle=True,
          classes=['class_1','class_2','class_3','class_4','class_5'])

  validation_generator = datagen.flow_from_directory(
          'validate',
          target_size=(224, 224),
          batch_size=batch_size,
          class_mode='categorical',  # this means our generator will only yield batches of data, no labels
          shuffle=True,
          classes=['class_1','class_2','class_3','class_4','class_5'])
  return train_generator,validation_generator

🌟 전이학습(Transfer Learning)

  • 전이학습은 특정 분야에서 학습된 신경망의 일부 능력을 유사하거나 전혀 새로운 분야에서 사용되는 신경망의 학습에 이용하는 것을 의미한다.
  • 수만~수천만장의 이미지를 통해 학습된 높은 성능을 갖는 Resnet 이나, VGG 신경망의 특징 추출능력을 그대로 이용하고, 마지막 출력 계층으로써 주로 선형 레이어만을 변경하여 이 변경된 레이어만을 재학습시키는 것이 전이학습이다.
  • 필요한 특정 이미지 세트로 CNN의 마지막 층만 다시 학습할 수 있다.
  • 전이 학습은 학습데이터 수가 적을때도 효과적이며, 학습 속도도 빠르다.

새로운 사례

CNN-LSTM 모델을 활용한 실시간 운전자 행위 식별 시스템

출처
https://blog.coast.ai/five-video-classification-methods-implemented-in-keras-and-tensorflow-99cad29cc0b5
전이학습 - TensorFlow를 사용한 딥러닝 모바일 프로젝트 / Jeff Tange지음/ 홍릉과학출판사

profile
FE Programmer
post-custom-banner

0개의 댓글