왜 이 태스크를 Word Window Classification이라고 부를까?
따라서 주변 단어도 인식해야 하므로 window를 사용한다.
먼저 파이토치 기본 모듈을 불러온다.
import torch
import torch.nn as nn
NLP에서 코퍼스는 일반적으로 .txt또는 .csv파일이다.
이번 토이 프로젝트에서는 데이터와 레이블을 파이썬 리스트로 이미 읽었다고 가정하자.
corpus = [
"We always come to Paris",
"The professor is from Australia",
"I live in Stanford",
"He comes from Taiwan",
"The capital of turkey is Ankara"
]
학습의 용이를 위해 일반적인 자연어 데이터 전처리 단계가 있다.
어떤 전처리를 할 지는 태스크에 따라 다르다.
어떤 태스크에서는 특수 문자 제거가 유용하지만 다른 태스크에서는 제거하지 않는게 유용할 수 있다.
예를 들어 여러 언어를 다루는 경우는 제거하지 않는게 유용하다.
이번 프로젝트에서는 단어를 소문자로 변환하고 토큰화하는 단계만 거치도록 하자.
def preprocess_sentence(sentence):
return sentence.lower().split()
train_sentence = [preprocess_sentence(sent) for sent in corpus]
train_sentence
[['we', 'always', 'come', 'to', 'paris'],
['the', 'professor', 'is', 'from', 'australia'],
['i', 'live', 'in', 'stanford'],
['he', 'comes', 'from', 'taiwan'],
['the', 'capital', 'of', 'turkey', 'is', 'ankara']]
모델의 목표는 위치에 해당하는 단어를 판별하는 것이다.
그러므로 문장의 단어가 위치를 의미하는지, 그렇지 않은지 레이블이 있어야 한다.
따라서 위치에 해당하는 단어는 1을, 그렇지 않은 단어는 0을 출력하도록 한다.
먼저 학습 데이터에 대해 레이블을 만든다.
locations = set(["australia", "ankara", "paris", "stanford", "taiwan", "turkey"])
train_labels = [[1 if word in locations else 0 for word in sent] for sent in train_sentences]
train_labels
[[0, 0, 0, 0, 1],
[0, 0, 0, 0, 1],
[0, 0, 0, 1],
[0, 0, 0, 1],
[0, 0, 0, 1, 0, 1]]
머신러닝 모델은 벡터를 입력을 받으므로 자연어를 그대로 입력할 수 없다.
이때 임베딩을 사용하면 단어를 벡터로 변환할 수 있다.
이 섹션에서는 단어를 어떻게 임베딩으로 변환하는지 살펴보겠다.
단어를 임베딩으로 변환하기 위해선 인덱스가 필요하다.
다시 말해 단어를 인덱스에 매핑한 뒤 임베딩으로 변환한다.
단어를 왜 굳이 인덱스에 매핑해야될까?
[Parameter containing:
tensor([[ 0.6592, 1.7533, -0.4539, -0.8667, 0.5120],
[-1.0848, -0.3810, -1.2999, -0.0715, 0.7878],
[-2.0299, 0.6524, -1.1977, -0.0920, 1.2706]],
requires_grad=True)]
위 같은 임베딩이 있다고 생각해보자.
이 임베딩 테이블을 조회하고자 할 때 단어로는 접근할 수 없다.
첫 번째 벡터는 0번 벡터이고 두 번째 벡터는 1번 벡터, 세 번째 벡터는 2번 벡터이다.
이렇게 숫자 인덱스를 먼저 만들어야 임베딩 테이블을 조회할 수 있으므로 단어를 인덱스에 매핑해야 한다.
정리하자면
어휘 모음집에서 단어의 인덱스 를 찾는다. (
임베딩 테이블에서 인덱스 를 이용해 임베딩을 가져온다. (
그럼 먼저 단어에 인덱스를 매핑해야 한다.
코퍼스에서 모든 고유 단어를 찾고 각 단어에 인덱스를 할당하여 매핑할 수 있다.
vocabulary = set(w for s in train_sentences for w in s)
vocabulary
{'always',
'ankara',
'australia',
'capital',
'come',
'comes',
'from',
'he',
'i',
'in',
'is',
'live',
'of',
'paris',
'professor',
'stanford',
'taiwan',
'the',
'to',
'turkey',
'we'}
코퍼스에 있는 모든 단어를 vocabulary에 넣었다.
이것을 '어휘 모음집'이라고 부르겠다.
vocabulary에 추가할 토큰들이 있다.
<unk>
테스트 때 어휘 모음집에 없는 미지의 단어가 발견될 수 있다.
예측은 인접 단어, 즉 문맥을 고려하므로 미지의 단어를 표현할 수만 있다면 모델은 그 단어가 위치인지 아닌지 추론할 수 있을 것이다.
따라서 미지의 단어를 처리하기 위한 특수한 토큰을 만들어야 하는데 이 토큰이 <unk> 토큰이다.
이 토큰을 어휘 모음집에 추가한다.
vocabulary.add("<unk>")
<pad>
태스크 이름을 다시 떠올려 보자.
Word Window Classificaiton 이다.
예측에 window를 이용해야 한다.
window를 이용한다는 것은 중심 단어의 앞과 뒤를 고려하여 문맥을 파악하겠다는 것이다.
예를 들어 문장 에서 window size가 1이라면 Paris의 바로 앞 뒤 단어를 고려해야 한다.
Paris의 앞 단어는 to인데 뒷 단어는 없다.
이를 방지하기 위해 패딩을 해서 모든 단어 주변에 유효한 윈도우가 있도록 해야한다.
따라서 <pad>라는 특수한 토큰을 도입한다.
window size가 2이고 예시 문장 를 패딩하면
['<pad>', '<pad>', 'We', 'always', 'come', 'to', 'Paris', '<pad>', '<pad>']처럼 변환될 것이다.
<pad> 토큰을 어휘 모음집에 추가하고 window size에 맞게 문장에 패딩 토큰을 추가하는 함수도 하나 만들어 보자.
# <pad> 토큰을 vocabulary에 추가하기
vocabulary.add("<pad>")
# 주어진 문장을 패딩하는 함수
def pad_window(sentence, window_size, pad_token="<pad>"):
window = [pad_token] * window_size
return window + sentence + window
# 패딩 예시 확인
window_size = 2
pad_window(train_sentences[0], window_size)
['<pad>', '<pad>', 'we', 'always', 'come', 'to', 'paris', '<pad>', '<pad>']
추가적으로 필요한 토큰은 다 준비되었으니 이제 단어에 인덱스를 배정해보자.
# 리스트 변환 및 정렬
ix_to_word = sorted(list(vocabulary))
# 주어진 단어의 인덱스를 찾기 위한 딕셔너리 만들기
word_to_ix = {word: ind for ind, word in enumerate(ix_to_word)}
word_to_ix
{'<pad>': 0,
'<unk>': 1,
'always': 2,
'ankara': 3,
'australia': 4,
'capital': 5,
'come': 6,
'comes': 7,
'from': 8,
'he': 9,
'i': 10,
'in': 11,
'is': 12,
'live': 13,
'of': 14,
'paris': 15,
'professor': 16,
'stanford': 17,
'taiwan': 18,
'the': 19,
'to': 20,
'turkey': 21,
'we': 22}
먼저 vocabulary를 인덱싱할 수 있도록 리스트로 변환한 뒤 정렬했다.
정렬이 필수는 아니지만 인덱스 사전을 보이기 위해 정렬했다.
패딩 토큰 <pad>도 인덱스를 갖는 것을 확인할 수 있다.
일부 파이토치 함수(예: nn.utils.rnn.pad_sequence가 0을 기본값으로 사용하므로 패딩 토큰을 0으로 설정하는 것이 편리하다.
ix_to_word를 확인해보자.
<unk>는 1에 매핑된 것을 확인할 수 있다.
ix_to_word에서 1번 인덱스를 찾으면 <unk>가 나올 것이다.
ix_to_word[1]
<unk>
이제 학습 데이터의 문장들을 단어 토큰에서 인덱스로 변환할 준비가 되었다.
단어 토큰으로 구성된 문장이 입력되면 토큰과 일치하는 인덱스를 반환하는 함수를 만들고 실행해보자.
# 토큰들의 문장이 주어지면 일치하는 인덱스들 반환
def convert_token_to_indices(sentence, word_to_ix):
indices = []
for token in sentence:
# 토큰이 vocabulary에 존재하는지 확인. 만약 존재하면 인덱스 가져오기
if token in word_to_ix:
index = word_to_ix[token]
# 존재하지 않으면 미지의 토큰의 인덱스 가져오기
else:
index = word_to_ix["<unk>"]
indices.append(index)
return indices
위 함수를 더 간략하게 만들 수도 있다.
def _convert_token_to_indices(sentence, word_to_ix):
return [word_to_ix.get(token, word_to_ix["<unk>"]) for token in sentence]
단어 토큰으로 구성된 예제 문장을 함수에 입력하여 인덱스로 변환된 출력을 확인해보자.
# 예시
example_sentence = ["we", "always", "come", "to", "kuwait"]
example_indices = convert_token_to_indices(example_sentence, word_to_ix)
restored_example = [ix_to_word[ind] for ind in example_indices]
print(f"Original sentence is: {example_sentence}")
print(f"Going from words to indices: {example_indices}")
print(f"Going from indices to words {restored_example}")
Original sentence is: ['we', 'always', 'come', 'to', 'kuwait']
Going from words to indices: [22, 2, 6, 20, 1]
Going from indices to words ['we', 'always', 'come', 'to', '<unk>']
단어 kuwait는 어휘 모음집에 없는 단어이므로 <unk>로 표시된다.
학습 데이터의 문장 전체를 인덱스로 변환해보자.
example_padded_indices = [convert_token_to_indices(s, word_to_ix) for s in train_sentences]
example_padded_indices
[[22, 2, 6, 20, 15],
[19, 16, 12, 8, 4],
[10, 13, 11, 17],
[9, 7, 8, 18],
[19, 5, 14, 21, 12, 3]]
모든 문장을 인덱스로 변환했다.
지금까지 우리가 한 것을 다시 한번 정리해보자.
학습을 위해선 단어를 임베딩으로 변환해야 한다.
먼저 단어를 인덱스로 변환해야 하는데 테스트에서 미지의 단어가 나오거나 window size에 문제가 있을 수 있다.
미지의 단어 토큰과 패딩 토큰을 어휘 모음집에 추가한다.
단어를 모두 인덱스로 변환한다.
이제 단어 임베딩을 만들면 된다.
단어 임베딩을 만든 후에는 인덱스에 해당하는 임베딩 벡터를 불러올 수 있는지 확인한다.
임베딩을 만들어보자.
파이토치에서는 nn.Embedding 클래스를 사용하여 임베딩 테이블을 생성할 수 있다.
nn.Embedding(num_words, embedding_dimension)과 같이 호출하는데 여기서 num_words는 어휘 모음집에 있는 단어의 개수, embedding_dimension은 내가 원하는 임베딩 차원이다.
nn.Embedding은 그렇게 특별한 것은 아니고 학습 가능한 N x E 차원 텐서를 감싸는 래퍼 클래스일 뿐이다.
네트워크를 학습시키면서 기울기가 임베딩 계층까지 역전파되므로 단어 임베딩 또한 업데이트된다.
모델에 사용할 임베딩 계층을 모델에서 초기화할 거지만 여기서 예시로 먼저 확인해보자.
# 단어들에 대한 임베딩 테이블 만들기
embedding_dim = 5
embeds = nn.Embedding(len(vocabulary), embedding_dim)
# 임베딩 테이블의 파라미터 출력
list(embeds.parameters())
[Parameter containing:
tensor([[ 0.6592, 1.7533, -0.4539, -0.8667, 0.5120],
[-1.0848, -0.3810, -1.2999, -0.0715, 0.7878],
[-2.0299, 0.6524, -1.1977, -0.0920, 1.2706],
[ 0.6724, -0.9402, 0.4818, 1.3432, 0.3305],
[-0.0532, -0.9718, 0.3757, -0.3882, 0.4272],
[ 0.2131, 0.4433, 0.4446, -1.8175, -1.6564],
[ 0.8602, 0.1637, 0.6588, -0.7834, -0.8804],
[ 0.6905, 1.0434, -1.3398, -0.6396, 1.1441],
[ 0.9439, 0.3364, 0.0229, 0.6974, -1.8301],
[-0.7928, 1.4713, 1.0654, 1.2519, 0.7585],
[ 0.0092, 0.2024, 0.1132, 0.2696, 0.8240],
[-1.4816, -2.5928, -1.9196, 0.5323, -1.4616],
[-2.1180, 0.8308, -0.6781, 0.0830, 0.2804],
[ 0.1184, -0.7513, -0.0627, 0.9325, -0.5774],
[ 0.5359, -1.3012, -1.1799, -1.1603, -0.1711],
[-0.0359, 0.9849, -1.3166, -0.8383, 0.6753],
[-0.3996, 0.9822, -0.0192, -0.0232, -0.6426],
[ 0.7611, -0.0053, 1.5956, 0.8071, -0.3724],
[ 0.0803, -0.3528, 1.6485, 0.2509, -0.6873],
[-0.3666, -0.2726, 1.3922, -0.6223, 1.6506],
[-0.3549, -0.3229, 1.2438, 0.2556, -0.3532],
[-0.4088, 0.0299, -0.8228, -1.0568, 0.3317],
[ 1.7594, -0.6873, -0.9532, 0.0617, 0.6470]], requires_grad=True)]
어휘 모음집 크기인 23, 임베딩 차원 5에 대한 23 x 5차원의 임베딩이 잘 생성되었다.
이제 인덱스로 조회를 해보자.
단어 paris의 인덱스를 먼저 찾고 이 인덱스에 해당하는 임베딩을 조회하면 된다.
# 단어 Paris의 임베딩 가져오기
index = word_to_ix["paris"]
index_tensor = torch.tensor(index, dtype=torch.long)
print("index_tensor is", index_tensor, "\n")
paris_embed = embeds(index_tensor)
print(paris_embed)
index_tensor is tensor(15)
tensor([-0.0359, 0.9849, -1.3166, -0.8383, 0.6753],
grad_fn=<EmbeddingBackward0>)
nn.Embedding 클래스는 Long Tensor 타입의 인덱스 텐서를 기대하므로 조회 텐서를 생성할 때 타입을 지정해주었다.
여러 임베딩을 한번에 가져올 수도 있다.
# 여러 임베딩을 한번에 가져올 수도 있음
index_paris = word_to_ix["paris"]
index_ankara = word_to_ix["ankara"]
indices = [index_paris, index_ankara]
indices_tensor = torch.tensor(indices, dtype=torch.long)
print("indices_tensor is", indices_tensor, "\n")
embeddings = embeds(indices_tensor)
print(embeddings)
indices_tensor is tensor([15, 3])
tensor([[-0.0359, 0.9849, -1.3166, -0.8383, 0.6753],
[ 0.6724, -0.9402, 0.4818, 1.3432, 0.3305]],
grad_fn=<EmbeddingBackward0>)
배치는 모델이 한 에포크의 학습을 할 때 데이터를 몇 개로 나누어 처리할 지 결정한다.
예를 들어 100개의 데이터에 대해 배치를 10으로 설정하면 한 에포크의 학습을 수행할 때 10개씩 10번 학습이 수행된다.
이 경우 파라미터 업데이트 또한 한 에포크 당 10번 수행된다.
배치를 왜 사용하는 걸까?
모든 데이터를 한번에 입력하는 풀 배치의 경우 한 에포크의 학습에 모든 데이터를 한번에 넣어 한번의 손실을 계산한다.
그러니까 한 에포크 당 한번에 파라미터 업데이트가 일어나는 것인데 이 방식은 계산 비용이 많이 든다.
이와 반대로 배치 크기를 1로 설정하여 각 샘플마다 파라미터르 업데이트하면 손실 값이 매 스텝마다 불안정하게 변할 수 있다.
다시 말해 배치 크기가 데이터 크기와 같은 경우는 계산 비용이 많이 든다는 단점이 있고
한 샘플씩 처리하여 파라미터를 업데이트하는 경우는 손실이 불안정하게 변한다는 단점이 있다.
하지만 데이터를 작은 배치 단위로 학습한 후 파라미터를 업데이트하는 방식, 즉 적절한 배치를 사용하면
계산 비용 문제나 손실 값이 매 스텝마다 불안정하게 변하는 문제를 해결할 수 있다.
이것이 배치를 사용하는 이유다.
좀 더 구체적인 배치의 장점은 계산 효율성과 일반화 성능이다.
배치가 일반화에 이점이 있는 이유는 배치 단위로 학습할 경우 적당한 노이즈가 생기는데
이 노이즈가 기울기 방향을 약간 흔들리게 만들어(근사 기울기)
로컬 미니멈에 갇히지 않고 넓고 평평한 영역의 미니멈을 찾도록 도와준다.
이러한 미니멈은 테스트 데이터에서도 안정적으로 작용할 가능성이 높아 과적합 문제를 해결할 수 있으므로 일반화 성능 측면에서 이점이 있다는 것이다.
풀 배치의 경우 학습 데이터를 너무 정밀하게 학습하여 과적합 문제가 발생할 여지가 있다.
반대로 배치를 사용하면 학습 데이터에 완벽히 맞추지 않아 과적합 문제를 해결하여 일반화 성능을 향상한다는 것이다.
이 섹션에서는 torch.util.data.DataLoader 클래스를 사용하여 데이터를 배치로 구조화하는 방법을 알아보겠다.
DataLoader 클래스는 다음과 같이 호출한다.
DataLoader(data, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
여기서 batch_size는 몇 개의 데이터로 나누어 처리할 지 결정한다.
DataLoader를 사용하면 매 에포크마다 batch_size 크기의 배치를 반복한다.
배치 순서는 디폴트 값에 의해 결정되지만 shuffle을 True로 설정하여 DataLoader에 배치를 셔플하도록 요청할 수 있다.
셔플을 하면 좋지 않은 배치를 여러번 만나지 않도록 할 수 있다.
DataLoader가 제공되는 경우 준비한 배치를 collate_fn에 전달한다.
collate_fn에는 배치에 대한 통계를 출력하거나 추가 처리를 수행하기 위한 커스텀 함수를 작성하여 넘겨줄 수 있다.
학습 데이터의 문장 길이는 다 같지 않으므로 길이가 다른 시퀀스를 다룰 때는 collate_fn 인자를 사용하여
각 샘플을 어떻게 하나의 배치로 묶을지 정의해야 한다.
일반적으로 패딩을 사용하여 모드 시퀀스의 길이를 동일하게 맞춘다.
이번의 경우 커스텀 collate_fn을 사용하여 다음을 수행할 것이다.
학습 문장에 윈도우 패딩 적용
학습 문장의 단어를 인덱스로 변환
모든 문장과 레이블의 길이가 같도록 학습 문장 패딩 및 레이블도 패딩
원래의 레이블 길이는 따로 저장
원래의 레이블 길이를 따로 저장하는 이유는 손실을 계산할 때 샘플의 실제 단어 개수를 알아야 하기 때문이다.
패딩된 토큰까지 포함하여 손실을 계산하면 학습이 왜곡되므로 원래 문장 길이를 기록하여 손실 계산에 활용하여 패딩 부분을 무시한다.
커스텀 collate_fn을 만들어 보자.
윈도우 패딩을 적용할 거니까 매개 변수로 window_size를 받도록 하고
인덱스 변환을 위해 단어와 인덱스 매핑 정보인 word_to_ix도 매개 변수로 받도록 하자.
from torch.utils.data import DataLoader
from functools import partial # 함수와 고정할 인자를 받아 그 인자가 이미 적용된 객체 반환
def custom_collate_fn(batch, window_size, word_to_ix):
# 배치를 학습 예제(x)와 레이블(y)로 나누기
x, y = zip(*batch)
# 1. 윈도우 패딩 함수
def pad_window(sentence, window_size, pad_token="<pad>"):
window = [pad_token] * window_size
return window + sentence + window
# 1. 학습 문장 패딩
x = [pad_window(s, window_size=window_size) for s in x]
# 2. 인덱싱 함수
def convert_tokens_to_indices(sentence, word_to_ix):
return [word_to_ix.get(token, word_to_ix["<unk>"]) for token in sentence]
# 2. 학습 문장 인덱싱
x = [convert_tokens_to_indices(s, word_to_ix) for s in x]
# 3. 길이 맞추기 패딩
# 패딩 토큰 인덱스 지정
pad_token_ix = word_to_ix["<pad>"]
# pad_sequence 함수는 입력이 텐서여야 하므로 x를 텐서로 변환함
x = [torch.LongTensor(x_i) for x_i in x]
x_padded = nn.utils.rnn.pad_sequence(x, batch_first=True, padding_value=pad_token_ix)
# 4. 레이블 패딩 전에 원래 문장 길이 저장
lengths = [len(label) for label in y]
lengths = torch.LongTensor(lengths)
# 3. 레이블도 길이 맞추기 패딩
# y 또한 텐서로 변환
y = [torch.LongTensor(y_i) for y_i in y]
y_padded = nn.utils.rnn.pad_sequence(y, batch_first=True, padding_value=0)
# 준비 완료
return x_padded, y_padded, lengths
위 함수는 길어보이지만 실제로 길지 않다.
윈도우 패딩과 인덱싱 함수의 경우 이미 앞에서 정의하여 출력을 확인했었다.
따라서 불필요한 함수와 주석을 제거하면 아래와 같다.
def _custom_collate_fn(batch, window_size, word_to_ix):
x, y = zip(*batch)
# 1. 윈도우 패딩
x = [pad_window(s, window_size=wondow_size) for s in x]
# 2. 인덱싱
x = [convert_tokens_to_indices(s, word_to_ix) for s in x]
# 3. 길이 맞추기 패딩
pad_token_ix = word_to_ix["<pad>"]
x = [torch.LongTensor(x_i) for x_i in x]
x_padded = nn.utils.rnn.pad_sequence(x, batch_first=True, padding_value=pad_token_ix)
# 3. 길이 맞추기 패딩
# 4. 원래 문장 길이 저장
lengths = [len(label) for label in y]
lengths = torch.LongTensor(lengths)
y = [torch.LongTensor(y_i) for y_i in y]
y_padded = nn.utils.rnn.pad_sequence(y, batch_first=True, padding_value=0)
return x_padded, y_padded, lengths
이제 DataLoader의 활용을 보자.
data = list(zip(train_sentences, train_labels))
batch_size = 2
shuffle = True
window_size = 2
# partial는 함수의 매개변수 입력을 미리 지정
# custom_collate_fn의 매개변수 window_size와 word_to_ix에 미리 입력
collate_fn = partial(custom_collate_fn, window_size=window_size, word_to_ix=word_to_ix)
# DataLoader 예시 보기
loader = DataLoader(data, batch_size=batch_size, shuffle=shuffle, collate_fn=collate_fn)
# 루프 한번 통과
counter = 0
for batched_x, batched_y, batched_lengths in loader:
print(f"Iteration {counter}")
print("Batched Input:")
print(batched_x)
print("Batched Labels:")
print(batched_y)
print("Batched Lengths:")
print(batched_lengths)
print("")
counter += 1
Iteration 0
Batched Input:
tensor([[ 0, 0, 19, 16, 12, 8, 4, 0, 0],
[ 0, 0, 9, 7, 8, 18, 0, 0, 0]])
Batched Labels:
tensor([[0, 0, 0, 0, 1],
[0, 0, 0, 1, 0]])
Batched Lengths:
tensor([5, 4])
Iteration 1
Batched Input:
tensor([[ 0, 0, 22, 2, 6, 20, 15, 0, 0, 0],
[ 0, 0, 19, 5, 14, 21, 12, 3, 0, 0]])
Batched Labels:
tensor([[0, 0, 0, 0, 1, 0],
[0, 0, 0, 1, 0, 1]])
Batched Lengths:
tensor([5, 6])
Iteration 2
Batched Input:
tensor([[ 0, 0, 10, 13, 11, 17, 0, 0]])
Batched Labels:
tensor([[0, 0, 0, 1]])
Batched Lengths:
tensor([4])
반복문을 통해 loader로부터 한 배치씩 출력되는 것을 확인할 수 있다.
이 함수의 호출과 동작을 정리하면
loader는 한번에 batch_size만큼 샘플을 꺼낸다. 위의 예시는 2개씩.
batch_size만큼의 샘플을 collate_fn에 넘겨준다.
collate_fn에서는 샘플을 받아 윈도우 패딩, 인덱싱, 길이 맞추기 패딩을 수행한다.
custom_collate_fn의 window_size와 word_to_ix가 partial에 의해 지정되었는데
batch는 custom_collate_fn가 DataLoader에서 호출될 때도 아무런 입력이 없다.
이는 DataLoader 함수 내부적으로 batch를 collate_fn 함수에 넘겨주므로 따로 입력을 할 필요가 없기 때문이다.
DataLoader의 출력, 즉 배치 처리된 텐서가 임베딩을 거쳐 [batch, seq_len, dim] 형태의 텐서로 변환되고
이 형태의 텐서가 모델에 입력될 것이다.
하지만 다시 생각해보자.
태스크는 Word Window Classifier이다.
그러니까 윈도우를 통해 문맥을 파악하는 분류기를 만들어야 한다.
따라서 모델이 전체 문장을 한번에 처리하는 것이 아니라 각 단어마다 주변 윈도우를 보고
이를 통해 중심 단어의 정답을 예측하는 구조라는 것이다.
현재 텐서는 하나의 문장 전체가 하나의 데이터 포인트로 구성되어 있다.
앞서 loader를 하나씩 출력해본 결과를 살펴보면 하나의 문장이 으로 변환된 것을 확인할 수 있다.
이것이 하나의 문장이 하나의 데이터 포인트로 구성되어 있음을 의미한다.
하지만 우리는 윈도우를 활용해야 하므로 각 단어마다 윈도우를 만들고 이 중심 단어가 위치와 관련된 단어인지 아닌지 예측하고 각 에측을 하나의 출력 벡터로 모아 반환해야 한다.
다시 말해 모델이 입력 전체를 받되 내부적으로 단어 단위로 나누어 처리를 해야 한다는 말이다.
데이터를 collate_fn, 전처리 등을 통해 사전에 슬라이딩 윈도우 단위로 나눴다면 상관없지만 현재 슬라이딩 윈도우 단위로 나누지 않았으므로 윈도우를 나누는 작업을 모델 안에서 하도록 처리할 것이다.
다시 말해 입력은 [batch, seq_len]을 받아 슬라이딩 윈도우 처리 후 임베딩을 거쳐 모델에 최종 입력될 것이다.
window_size가 N이면 모델은 매 2N+1개의 토큰마다 한번씩 예측한다.
중심 단어 좌측에 N개, 우측에 N개가 있고 중심 단어는 하나이므로
이다.
예를 들어 window_size가 2라면 매번 5개의 단어를 하나의 윈도우로 사용한다.
시퀀스 길이가 9, 즉 토큰 개수가 9개인 문장을 생각해보자.
이때 9는 패딩을 포함한 전체 길이를 의미한다.
그리고 window_size가 2라면 윈도우는 5개의 단어로 구성되므로 중심 단어 위치에 해당하는 예측은 총 5개이다.
문장 으로 예를 들어 보자.
이 문장에서 윈도우 패딩을 빼면 이며 5개의 단어로 구성되어 있다.
이 문장에 대한 슬라이딩 윈도우는 아래와 같다.
[[ 0, 0, 19, 16, 12],
[ 0, 19, 16, 12, 8],
[19, 16, 12, 8, 4],
[16, 12, 8, 4, 0],
[12, 8, 4, 0, 0]]
중심 단어가 순서대로 19, 16, 12, 8, 4인 것을 확인할 수 있다.
슬라이딩 윈도우를 파이썬 루프로 구현할 수 있지만 파이토치에서는 더 빠른 벡터화 연산인 tensor.unfold()를 사용할 수 있다.
이 메서드는 텐서의 특정 차원을 따라 슬라이딩 윈도우 방식으로 데이터를 추출하여 새로운 텐서를 생성하는 함수이다.
unfold(dimension, size, step)으로 이용한다.
아래의 코드를 살펴보자.
# 원본 텐서 출력
print(f"Original Tensor: ")
print(batched_x)
print("")
# 2 * 2 + 1 chunks 만들기
chunk = batched_x.unfold(1, window_size*2 + 1, 1)
print(f"Windows: ")
print(chunk)
Original Tensor:
tensor([[ 0, 0, 10, 13, 11, 17, 0, 0]])
Windows:
tensor([[[ 0, 0, 10, 13, 11],
[ 0, 10, 13, 11, 17],
[10, 13, 11, 17, 0],
[13, 11, 17, 0, 0]]])
데이터가 다 준비되었으니 모델을 구축할 준비가 되었다.
우리가 만들 모델의 주요 매개 변수는 hyperparameters, vocab_size, pad_ix이다.
hyperparameters는 딕셔너리 형식이며 모델의 변수들을 초기화한다.
vocab_size는 고유 단어의 개수로 단어가 인덱싱되었을 때 총 몇 개의 인덱스가 존재하는 지를 의미한다.
vocab_size는 임베딩에 필요하다.
pad_ix 또한 임베딩에 필요하며 패딩 토큰의 인덱스가 몇 번 인덱스인지를 의미한다.
우리는 <pad>를 0번으로 설정할 것이므로 디폴트 값을 0으로 설정할 것이다.
__init__에서는 hyperparameters로 부터 인스턴스 변수를 설정할 것이며 vocab_size와 pad_ix를 이용해 임베딩도 만들 것이다.
윈도우로 나누어 문맥을 확인하여 중심 단어를 예측하기로 했으므로 은닉층의 입력 크기는 이 윈도우 크기에 맞춘다.
그러면 full_window_size * embed_dim차원이 입력 차원이 된다.
class WordWindowClassifier(nn.Module):
def __init__(self, hyperparameters, vocab_size, pad_ix=0):
super(WordWindowClassifier, self).__init__()
"""인스턴스 변수"""
self.window_size = hyperparameters["window_size"]
self.embed_dim = hyperparameters["embed_dim"]
self.hidden_dim = hyperparameters["hidden_dim"] # hidden layer의 출력 차원
self.freeze_embeddings = hyperparameters["freeze_embeddings"]
""" Embedding Layer
임베딩 인덱스를 포함하는 텐서를 입력받아 해당 임베딩을 반환
출력은 dim(인덱스 수 * 임베딩 차원) -> 예: 입력(23차원 임베딩) -> 출력(23x5 임베딩)
freeze_embeddings가 True면 임베딩 레이어 파라미터를 학습 불가능으로 설정
임베딩 파라미터를 제외한 다른 파라미터만 변경하려는 경우 유용
freeze_embeddings가 True라면 requires_grad를 False로 설정
False -> embed_layer의 가중치는 역전파 과정에서 기울기가 계산되지 않아 업데이트되지 않음
"""
self.embeds = nn.Embedding(vocab_size, self.embed_dim, padding_idx=pad_ix)
if self.freeze_embeddings:
self.embed_layer.weight.requires_grad = False
"""Hidden Layer
full_window_size는 하나의 윈도우에 포함되는 총 단어 개수
"""
full_window_size = 2 * self.window_size + 1
self.hidden_layer = nn.Sequential(
nn.Linear(full_window_size * self.embed_dim, self.hidden_dim),
nn.Tanh() # optim -> non-linear
)
"""Output Layer
"""
self.output_layer = nn.Linear(self.hidden_dim, 1)
""" Probabilities
이진 분류 -> 로짓 값을 0과 1사이의 확률 값으로 변환
"""
self.probabilities = nn.Sigmoid()
def forward(self, inputs):
"""
B:= batch_size
L:= window-padded sentence length
D:= self.embed_dim
S:= self.window_size
H:= self.hidden_dim
입력: 인덱싱된 토큰들에 대한 (B, L) 텐서 = 배치 크기 x 윈도우 패딩된 문장 길이
"""
B, L = inputs.size()
"""
Reshaping.
(B, L) LongTensor -> (B, L~, S) LongTensor
L~ = L_adjusted = 윈도우 패딩 전 원래 문장 길이 = L - (full_window_size - 1) (예: 4)
S = full_window_size = 각 윈도우에 포함된 토큰 개수 (예: 5)
예: [ 0, 0, 10, 13, 11, 17, 0, 0]
-> L = 8, S = 5
-> L~ = 8 - (5 - 1) = 4
-> (B, L~, S) = (B, 4, 5)
-> token_windows = 4 x 5 tensor [[ 0, 0, 10, 13, 11],
[ 0, 10, 13, 11, 17],
[10, 13, 11, 17, 0],
[13, 11, 17, 0, 0]]
"""
token_windows = inputs.unfold(1, 2 * self.window_size + 1, 1)
_, adjusted_length, _ = token_windows.size()
# 텐서 크기의 정상성 검사 = token_windows의 실제 크기가 예상과 일치하는지 확인
assert token_windows.size() == (B, adjusted_length, 2 * self.window_size + 1)
"""
Embedding.
(B, L~, S)의 torch.LongTensor -> (B, L~, S, D)의 FloatTensor 출력
(B, L_adjusted, full_window_size) -> (B, L_adjusted, full_window_size, self.embed_dim)
"""
embedded_windows = self.embeds(token_windows)
"""
Reshaping.
nn.Linear 레이어의 입력에 맞게 임베딩된 윈도우 텐서 평탄화
(B, L~, S, D) FloatTensor -> (B, L~, S*D) FloatTensor
view()로 텐서 형태 변경(reshape)
B, adjusted_length-> 배치 크기, 조정 길이 차원은 그대로 유지
나머지 모든 차원(full_window_size, self.embed_dim)을 곱하여 하나의 차원으로 평탄화
-> 이때 -1 사용
"""
embedded_windows = embedded_windows.view(B, adjusted_length, -1)
"""
Layer 1.
(B, L~, S*D) FloatTensor -> (B, L~, H) FloatTensor
(full_window_size * self.embed_dim) -> (self.hidden_dim)
"""
layer_1 = self.hidden_layer(embedded_windows)
"""
Layer 2.
(B, L~, H) FloatTensor -> (B, L~, 1) FloatTensor
(B, L_adjusted, self.hidden_dim) -> (B, L_adjusted, 1)
텐서의 각 값은 위치를 의미할 확률에 대한 정규화되지 않은 점수(로짓)
"""
output = self.output_layer(layer_1)
"""
Sigmoid.
(B, L~, 1) FloatTensor (정규화되지 않은 점수) -> (B, L~, 1) FloatTensor (정규화된 점수)
(B, L~, 1) -> view() -> (B, L~ * 1) = (B, L~)
텐서 각 요소의 의미: 해당 윈도우의 중심 단어가 location일 확률
예: (2, 4) -> 각 배치에서 4개의 윈도우에 대한 확률이 제공됨
"""
output = self.probabilities(output)
output = output.view(B, -1)
return output
먼저 데이터를 준비하고 모델을 초기화하자.
그 다음 옵티마이저와 손실 함수를 정의하자.
손실 함수는 사전에 정의된 손실 함수를 사용하는 대신 자체 손실 함수를 만들어 정의해보자.
# 데이터 준비
data = list(zip(train_sentences, train_labels))
batch_size = 2 # B
shuffle = True
window_size = 2 # <pad> <pad> ... <pad> <pad>
# window padding -> padding x -> indexing -> padding y
collate_fn = partial(custom_collate_fn, window_size=window_size, word_to_ix=word_to_ix)
# 데이터로더
loader = DataLoader(data, batch_size=batch_size, shuffle=shuffle, collate_fn=collate_fn)
# 모델 초기화
# 모델 파라미터를 딕셔너리 형식으로 설정하는 것이 유용
model_hyperparameters = {
"batch_size": 4,
"window_size": 2,
"embed_dim": 25, # -> 윈도우를 몇 차원으로 표현할 것인지 (full_window_size * embed_dim)
"hidden_dim": 25, # -> 은닉층 노드 개수 -> 몇 차원으로 압축할 것인지
"freeze_embeddings": False, # -> 학습 중 임베딩 업데이트 x
}
# 고유 단어 총 개수: (words -> index(dictionary)) = word_to_ix -> len(word_to_ix)
vocab_size = len(word_to_ix)
model = WordWindowClassifier(model_hyperparameters, vocab_size)
# 옵티마이저 정의 (가중치를 업데이트할 때 사용할 함수)
learning_rate = 0.01
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
# 손실 함수 정의 (binary cross entropy loss 계산)
def loss_function(batch_outputs, batch_labels, batch_lengths):
# 전체 배치에 대한 손실 계산
bceloss = nn.BCELoss() # Binary Cross Entropy Loss
loss = bceloss(batch_outputs, batch_labels.float()) # 예측값, 실제값 입력하여 손실 계산
# loss 리스케일링
# '각' 학습 문장의 단어 개수는 다를 수 있다 -> 배치마다 예측 항의 수가 다르면 손실 크기 불균형 발생
# 전체 윈도우 수로 나누어 실제 문장 길이(윈도우 수)에 따른 스케일 조절
loss = loss / batch_lengths.sum().float()
return loss
이제 학습 함수를 만들어보자.
배치를 사용하므로 한 에포크마다 배치 개수만큼의 학습이 수행되어야 한다.
따라서 한 에포크마다 호출되어 배치에 따른 학습을 수행하는 함수와 전체 학습 함수 총 2개를 만들 것이다.
# 매 에포크마다 호출될 함수
def train_epoch(loss_function, optimizer, model, loader):
# 배치마다 총 손실의 추적 유지
total_loss = 0
for batch_inputs, batch_labels, batch_lengths in loader:
# 그래디언트 0으로 초기화
optimizer.zero_grad()
# 순전파
outputs = model.forward(batch_inputs)
# 배치 손실 계산
loss = loss_function(outputs, batch_labels, batch_lengths)
# 그래디언트 계산
loss.backward()
# 파라미터 업데이트
optimizer.step()
total_loss += loss.item()
return total_loss
# 메인 학습 루프를 포함한 함수
def train(loss_function, optimizer, model, loader, num_epochs=10000):
# 각 에포크마다 train_epoch 함수 호출을 반복
for epoch in range(num_epochs):
epoch_loss = train_epoch(loss_function, optimizer, model, loader)
if epoch % 100 == 0: print(epoch_loss)
이제 학습을 시켜보자.
num_epochs = 1000
train(loss_function, optimizer, model, loader, num_epochs=num_epochs)
0.3301669880747795
0.2386614829301834
0.20520811155438423
0.13998496904969215
0.1253802478313446
0.08283618465065956
0.07396731711924076
0.05714165885001421
0.04130138736218214
0.039603686425834894
전체 손실이 줄어드는 것을 확인할 수 있다!
모델의 위치를 의미하는 단어를 잘 예측하는지 살펴보자.
테스트 데이터를 만드는 것부터 시작해보자.
# 테스트 문장 만들기
test_corpus = ["She comes from Paris"]
test_sentences = [s.lower().split() for s in test_corpus]
test_labels = [[0, 0, 0, 1]]
# 테스트 로더 만들기
test_data = list(zip(test_sentences, test_labels))
batch_size = 1
shuffle = False
window_size = 2
collate_fn = partial(custom_collate_fn, window_size=2, word_to_ix=word_to_ix)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=1, shuffle=False, collate_fn=collate_fn)
테스트 결과를 출력해보자.
for test_instance, labels, _ in test_loader:
outputs = model.forward(test_instance)
print(labels)
print(outputs)
tensor([[0, 0, 0, 1]])
tensor([[0.1062, 0.0382, 0.0342, 0.8776]], grad_fn=<ViewBackward0>)