이번 과제의 목표는 Recurrent Neural Network (RNN) 을 이용하여 language modling task 를 학습하는 것이다.
RNN 모델을 구현하고, 주어진 데이터를 가공하여 모델을 학습한 후 학습된 언어 모델을 이용해 문장을 생성하는 것이 과제 내용이다.
Dictionary: 데이터에 등장하는 어휘의 집합. 집합 내 어휘를 unique한 index에 mapping 한다.
Corpus:모델의 학습, 테스트 과정에서 사용되는 입력을 준비한다. 데이터를 load 하고 dictionary 를 생성한다. 데이터를 tokenize 하고 생성한 dictionary 를 이용해 각 단어(tokenize 된 output)를 id로 변환한다.
class Dictionary(object):
def __init__(self):
self.word2idx = {}
self.idx2word = []
def add_word(self, word):
if word not in self.word2idx:
self.idx2word.append(word)
self.word2idx[word] = len(self.idx2word) - 1
return self.word2idx[word]
def __len__(self):
return len(self.idx2word)
위의 Code는 Dictionary Class 이다.
add_word 함수를 보면, idx2word는 현재까지 들어온 unique한 어휘의 갯수와 index를 알기 위해 word2idx dict에 현재 들어온 어휘가 없다면, append 하여 길이를 늘려준다.
word2idx는 dict 타입으로써 지금까지 들어온 어휘의 idx에 따라 번호를 부여 받는다.
class Corpus(object):
def __init__(self, path):
self.dictionary = Dictionary()
self.train = self.tokenize(os.path.join(path, 'train.txt'))
self.valid = self.tokenize(os.path.join(path, 'valid.txt'))
self.test = self.tokenize(os.path.join(path, 'test.txt'))
def tokenize(self, path):
assert os.path.exists(path)
with open(path, 'r', encoding="utf8") as f:
for line in f:
words = line.split() + ['<eos>']
for word in words:
self.dictionary.add_word(word)
with open(path, 'r', encoding="utf8") as f:
idss = []
for line in f:
words = line.split() + ['<eos>']
ids = []
for word in words:
ids.append(self.dictionary.word2idx[word])
idss.append(torch.tensor(ids).type(torch.int64))
ids = torch.cat(idss)
return ids
위 code는 Corpus Class이다.
tokenize 함수를 보면, 각 data의 line 별로 parsing을 하면서 위에 정의한 Dictionary Class의 add_word를 이용하여 어휘를 idx로 변환 해준다.
두번째 file을 open 했을 때는 위에서 만든 어휘별로 만든 dict를 이용하여 Data를 idx tenser로 변환 해준다.
class RNNModel(nn.Module):
"""Container module with an encoder, a recurrent module, and a decoder."""
def __init__(self, rnn_type, ntoken, ninp, nhid, nlayers, dropout=0.5):
super(RNNModel, self).__init__()
self.ntoken = ntoken
self.drop = nn.Dropout(dropout)
self.encoder = nn.Embedding(ntoken, ninp)
if rnn_type in ['LSTM', 'GRU']:
self.rnn = getattr(nn, rnn_type)(ninp, nhid, nlayers, dropout=dropout)
else:
try:
nonlinearity = {'RNN_TANH': 'tanh', 'RNN_RELU': 'relu'}[rnn_type]
except KeyError:
raise ValueError( """An invalid option for `--model` was supplied,
options are ['LSTM', 'GRU', 'RNN_TANH' or 'RNN_RELU']""")
self.rnn = nn.RNN(ninp, nhid, nlayers, nonlinearity=nonlinearity, dropout=dropout)
self.decoder = nn.Linear(nhid, ntoken)
self.init_weights()
self.rnn_type = rnn_type
self.nhid = nhid
self.nlayers = nlayers
def init_weights(self):
initrange = 0.1
nn.init.uniform_(self.encoder.weight, -initrange, initrange)
nn.init.zeros_(self.decoder.weight)
nn.init.uniform_(self.decoder.weight, -initrange, initrange)
def forward(self, input, hidden):
encoded=self.encoder(input)
out,hidden=self.rnn(encoded,hidden)
out=self.drop(out)
decoded=self.decoder(out)
decoded=decoded.view(-1,self.ntoken)
return F.log_softmax(decoded, dim=1), hidden
def init_hidden(self, bsz):
weight = next(self.parameters())
if self.rnn_type == 'LSTM':
return (weight.new_zeros(self.nlayers, bsz, self.nhid),
weight.new_zeros(self.nlayers, bsz, self.nhid))
else:
return weight.new_zeros(self.nlayers, bsz, self.nhid)
위 Code는 RNN Model을 설계한 Code이다.
RNN의 type은 LSTM 과 LSTM을 경량화 한, GRU가 있다.
Forward 함수를 살펴보자. 함수의 파라미터로는 input과 hidden이 들어오는 것을 확인할 수 있다. input은 bacth size 만큼 짤린 tenser이고 hidden은 hidden node라고 생각 할 수 있다. 여기서 위에 정의한 self.encoder를 활용하여 input을 임베딩 시켜준 후, Rnn을 통과 시켜 out과 wight가 학습된 hidden node들을 얻는다.
위에서 얻은 output을 dropout을 시켜주어 처리해야할 정보량을 줄인 후, ntoken 만큼을 -1 방향으로 transfose 시켜 주고, softmax 활성 함수를 거쳐 return 해준다.
이렇게 Transfose를 하는 이유는 Model을 train 할 때, loss를 구해야할 target tensor와 차원을 맞추기 위해서 이다.
def train():
# Turn on training mode which enables dropout.
model.train()
total_loss = 0.
start_time = time.time()
ntokens = len(corpus.dictionary)
hidden = model.init_hidden(args.batch_size)
for batch, i in enumerate(range(0, train_data.size(0) - 1, args.bptt)):
data, targets = get_batch(train_data, i)
model.zero_grad()
hidden = repackage_hidden(hidden)
output, hidden = model(data, hidden)
loss=criterion (output,targets)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), args.clip)
for p in model.parameters():
p.data.add_(p.grad, alpha=-lr)
total_loss += loss.item()
if batch % args.log_interval == 0 and batch > 0:
cur_loss = total_loss / args.log_interval
elapsed = time.time() - start_time
print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.2f} | ms/batch {:5.2f} | '
'loss {:5.2f} | ppl {:8.2f}'.format(
epoch, batch, len(train_data) // args.bptt, lr,
elapsed * 1000 / args.log_interval, cur_loss, math.exp(cur_loss)))
total_loss = 0
start_time = time.time()
if args.dry_run:
break
loss=criterion (output,targets) 이 부분에서 위에서 정의한 loss함수를 통하여, Model과 라벨링된 정답인 Targets의 차이를 구한다.
loss.backward()에서는 위에서 구한 loss를 바탕으로 역전파를 진행하여 각 wight를 업데이트 한다.