Subword Tokenization

JaeYoung Lee·2023년 11월 26일

NLP

목록 보기
1/1

Subword란

Subword는 하나의 단어를 여러개의 단위로 분리했을 때 하나의 단위를 나타냅니다. "subword"를 subword 단위로 나타낸 하나의 예시는 다음과 같습니다.

"sub" + "word"

sub라는 접두사와 word라고 하는 어근으로 나누어 "subword"라고 하는 word를 2개의 subword로 나타냈습니다.

이외에도 다양한 형태의 subword로 나타낼 수 있습니다. (e.g., "su" + "bword", "s" + "ubword", "subwor" + "d")

tokenization

tokenization은 주어진 입력 데이터를 자연어처리 모델이 인식할 수 있는 단위로 변환해주는 방법입니다.

word tokenization

word tokenization의 경우 "단어"가 자연어처리 모델이 인식하는 단위가 됩니다.
"I have a meal"이라고 하는 문장을 가지고 word tokenization을 하면 다음과 같습니다.

['I', 'have', 'a', 'meal']

영어의 경우 대부분 space를 기준으로 단어가 정의되기 때문에 .split()을 이용해 쉽게 word tokenization을 구현할 수 있습니다.
영어에서 word tokenization은 space tokenization이라고도 할 수 있고, subword tokenization 이전에 수행되는 pre-tokenization 방법으로도 많이 사용됩니다.

한국어의 경우 "나는 밥을 먹는다"라는 문장을 word tokenization하면 다음과 같습니다.

['나', '는', '밥', '을', '먹는다']

한국어에서 "단어"는 공백(space)을 기준으로 정의되지 않습니다. 이는 한국어가 갖고 있는 "교착어"로서의 특징 때문입니다.
체언 뒤에 조사가 붙는 것이 대표적인 특징이며 의미 단위가 구분되고 자립성이 있기 때문에 조사는 "단어"입니다.

따라서 한국어에서는 pre-tokenization 방법으로 space tokenization을 사용하지 않고 형태소 분석기를 활용하고 있습니다.

그렇다면 Subword tokenization은 무엇인가요?

Subword tokenization

Subword tokenizaiton은 말 그대로 subword 단위로 tokenization을 한다는 뜻입니다.
character와 word 사이에 위치한다고 볼 수 있겠네요.

방금 전 word tokenization을 수행했던 문장을 이용해 subword tokenization을 수행한 예시를 보겠습니다.

Subword tokenization을 적용했을 때는 다음과 같이 tokenization이 될 수 있습니다.

Example 1
"I have a meal" -> ['I', 'hav', 'e', 'a', 'me', 'al']
"나는 밥을 먹는다" -> ['나', '는', '밥', '을', '먹는', '다']

word 단위가 아니라 그보다 더 잘게 쪼갠 subword 단위로 문장을 tokenization합니다.

위에서 말씀드린 것과 같이 여러가지 경우의 수가 가능합니다.

Example 2
"I have a meal" -> ['I', 'ha', 've', 'a', 'mea', 'l']
"나는 밥을 먹는다" -> ['나', '는', '밥', '을', '먹', '는다']

그렇지만 기본적으로 공백을 넘어선 subword를 구성하진 않습니다.
즉, 이미 띄어쓰기가 되어있는 상태를 굳이 다시 합쳐 구성하진 않습니다.
예를 들어 다음과 같이 tokenizaiton을 수행하진 않습니다.

Example 3
"I have a meal" -> ['Iha', 've', 'am', 'ea', 'l']
"나는 밥을 먹는다" -> ['나는밥', '을먹', '는다']

그렇다면 subword tokenization이 왜 필요할까?
word tokenization 코드를 불러와 그 필요성을 생각해 봅시다.

import os
from io import open
import torch

class Dictionary(object):   # vocab
    def __init__(self):
        self.word2idx = {'<unk>': 0} # dict
        self.idx2word = ['<unk>']    # list

    def add_word(self, word):
        if word not in self.word2idx:     # idx2word에 해당 단어가 없다면
            self.idx2word.append(word)    # 해당 word를 추가하고
            self.word2idx[word] = len(self.idx2word) - 1    #word2idx에 해당 단어에 대응되는 idx값을 업데이트트
        return self.word2idx[word]

    def __len__(self):
        return len(self.idx2word)         # vocab의 길이를 return
class Corpus(object):
    def __init__(self, path):
        self.dictionary = Dictionary()
        """Tokenizes a text file."""
        assert os.path.exists(path)   # path가 유효하다면 Return True.
        # Add words to the dictionary
        with open(os.path.join(path, 'train.txt'), 'r', encoding="utf8") as f:
            for line in f:
                words = line.split() + ['<eos>'] # ex) [i, study, math, <eos>]
                for word in words:
                    self.dictionary.add_word(word) # dict에 token을 추가해줌

        self.train = self.tokenize(os.path.join(path, 'train.txt'))
        self.valid = self.tokenize(os.path.join(path, 'valid.txt'))
        self.test = self.tokenize(os.path.join(path, 'test.txt'))

    def tokenize(self, path):
        # Tokenize file content
        with open(path, 'r', encoding="utf8") as f:
            idss = [] # 전체 text를 담을 공간
            for line in f:
                words = line.split() + ['<eos>']
                ids = [] # 각 문장들을 index값으로 변경한 값이 담김김
                for word in words:
                    try:
                        ids.append(self.dictionary.word2idx[word]) # word에 대응되는 index값들이 담겨요
                    except: # ERROR 발생 = vocab에 없는 단어 발생생
                        print(word)   
                        ids.append(0) # <unk>으로 대체체
                idss.append(torch.tensor(ids).type(torch.int64))
            ids = torch.cat(idss)

        return ids
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

class RNNModel(nn.Module):
    """Container module with an encoder, a recurrent module, and a decoder."""

    def __init__(self, rnn_type, ntoken, ninp, nhid, nlayers, dropout=0.5):
        super(RNNModel, self).__init__()
        self.ntoken = ntoken
        self.drop = nn.Dropout(dropout)
        self.encoder = nn.Embedding(ntoken, ninp)
        if rnn_type in ['LSTM', 'GRU']:
            self.rnn = getattr(nn, rnn_type)(ninp, nhid, nlayers, dropout=dropout)
        else:
            try:
                nonlinearity = {'RNN_TANH': 'tanh', 'RNN_RELU': 'relu'}[rnn_type]
            except KeyError:
                raise ValueError( """An invalid option for `--model` was supplied,
                                 options are ['LSTM', 'GRU', 'RNN_TANH' or 'RNN_RELU']""")
            self.rnn = nn.RNN(ninp, nhid, nlayers, nonlinearity=nonlinearity, dropout=dropout)
        self.decoder = nn.Linear(nhid, ntoken)

        self.init_weights()

        self.rnn_type = rnn_type
        self.nhid = nhid
        self.nlayers = nlayers

    def init_weights(self):
        initrange = 0.1
        nn.init.uniform_(self.encoder.weight, -initrange, initrange) # -0.1~0.1 범위의 유니폼 난수
        nn.init.zeros_(self.decoder.weight)
        nn.init.uniform_(self.decoder.weight, -initrange, initrange)

    def forward(self, input, hidden):
        emb = self.drop(self.encoder(input)) # drop?
        output, hidden = self.rnn(emb, hidden)
        output = self.drop(output)
        decoded = self.decoder(output)
        decoded = decoded.view(-1, self.ntoken)
        return F.log_softmax(decoded, dim=1), hidden

    def init_hidden(self, bsz):
        weight = next(self.parameters())
        if self.rnn_type == 'LSTM':
            return (weight.new_zeros(self.nlayers, bsz, self.nhid),
                    weight.new_zeros(self.nlayers, bsz, self.nhid))
        else:
            return weight.new_zeros(self.nlayers, bsz, self.nhid)
import time
import math
import os
import torch
import torch.nn as nn

import easydict
args = easydict.EasyDict({
    "data"    : './data/wikitext-2',    # location of the data corpus
    "model"   : 'RNN_TANH',             # type of recurrent net (RNN_TANH, RNN_RELU, LSTM, GRU)
    "emsize"  : 200,                    # size of word embeddings
    "nhid"    : 512,                    # number of hidden units per layer (히든 레이어 한 층의 노드 수)
    "nlayers" : 2,                      # number of layers
    "lr"      : 20,                     # initial learning rate
    "clip"    : 0.25,                   # gradient clipping
    "epochs"  : 6,                      # upper epoch limit
    "batch_size": 20,                   # batch size
    "bptt"    : 35,                     # sequence length
    "dropout" : 0.2,                    # dropout applied to layers (0 = no dropout)
    "seed"    : 1111,                   # random seed
    "cuda"    : True,                   # use CUDA
    "log_interval": 200,                # report interval
    "save"    : 'model.pt',             # path to save the final model
    "dry_run" : True,                   # verify the code and the model

})

# 디바이스 설정
device = torch.device("cuda" if args.cuda else "cpu")

# train.txt의 문장들을 word tokenization 해보고 단어들의 개수를 세어보겠습니다

corpus = Corpus('./data/wikitext-2')
ntokens = len(corpus.dictionary)
print(ntokens)  # vocab(사전)의 size
# 33278

embedding dimension의 크기는 200이므로 word embedding에 사용된 parameter의 수는 33278 x 200 (6,655,600개)입니다.
-> parameter라는 표현의의 이유는 우리가 지정한 embedding dim이 200이라면 각각 word마다 200 dimension의 벡터로 표현된다. 처음엔 랜덤한 값으로 초기화 되고, 후에 학습을 하면서 embedding vector들이 update 되기 떄문에

그렇다면, RNN 모델에 사용되는 weight의 parameter 개수는 몇개인지 간단한 함수를 이용해 확인해보겠습니다

model = RNNModel(args.model, ntokens, args.emsize, args.nhid, args.nlayers, args.dropout)

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
    
print(f"Word embedding parameter 개수: {count_parameters(model.encoder)}")
print(f"RNN parameter 개수: {count_parameters(model.rnn)}")

Word embedding parameter 개수: 6655600
RNN parameter 개수: 890880

RNN parameter, Word embedding parameter 개수를 비교해보면 embedding parameter가 압도적으로 많습니다. (여기서 parameter란 학습과정에서 업데이트 되는 변수를 의미합니다.)
training에 사용되는 text file의 크기가 커질수록 word embedding parameter는 더 커지게 되고 전체 parameter에서 word embedding parameter가 차지하는 비중이 매우 높음을 볼 수 있습니다.

이런 parameter 비중의 비대칭성을 해결하기 위해 처음에는 character-level tokenization 방법이 주목을 받았습니다.

Character-level tokenization

말 그대로 하나의 글자를 기준으로 tokenization을 하는건데요.
이전 예시를 character 기반 tokenization을 하면 다음과 같습니다.

"I have a meal" -> ['I', 'h', 'a', 'v', 'e', 'a', 'm', 'e', 'a', 'l']
"나는 밥을 먹는다" -> ['나', '는', '밥', '을', '먹', '는', '다']

그러나, character 기반 tokenization 역시 지나치게 긴 sequence 길이, 성능 저하 등의 문제를 겪으며
subword tokenization이 각광을 받게 되었습니다.

Subword tokenization

❓ word tokenization 기법과 구분되는 subword tokenization의 장점을 하나만 더 생각해볼까요?

subword tokenization의 장점은 Out-of-vocabulary (OOV) 문제에서 상대적으로 자유롭다는 것입니다.

일반적으로 subword들은 최소 철자 단위에서 하나씩 더 긴 subword를 추가하는 방식으로 만들어집니다.

예를 들어, 영어의 경우 a~z의 알파벳부터 시작해서 두글자, 세글자, 네글자 subword 등으로 확장해나가며
subword를 추가해 단어를 구성하고 이를 바탕으로 subword tokenization을 수행하기 때문에 다른 언어를 tokenization하지 않는다면
OOV 문제에서 자유롭다고 볼 수 있습니다.

대표적인 subword tokenization에 사용되는 algorithm 중 하나인 BPE, Word2vec 등이 있습니다.

그럼 이제부터 BERT 모델에서 사용하는 subword tokenization algorithm을 이용해 language modeling task를 수행해보겠습니다.
subword tokenizer는 transformers 라이브러리를 이용해 쉽게 불러올 수 있습니다.

(참고5: Huggingface: subword tokenization)

from transformers import BertTokenizer

tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
# subword tokenization 예시
print(tokenizer.tokenize('Natural language expert training course'))
print(tokenizer.tokenize('Goorm X KAIST'))

이제 subword tokenizer로 Corpus를 만든 후 parameter 개수를 확인해보겠습니다.

class Corpus(object):
    def __init__(self, path):
        self.dictionary = Dictionary()    # dict가 항상 참조할 수 있게 붙어있는구나.
        self.train = self.tokenize(os.path.join(path, 'train.txt'))
        self.valid = self.tokenize(os.path.join(path, 'valid.txt'))
        self.test = self.tokenize(os.path.join(path, 'test.txt'))

    def tokenize(self, path):
        assert os.path.exists(path)
        # Add words to the dictionary
        with open(path, 'r', encoding="utf8") as f:
            for line in f:
                words = tokenizer.tokenize(line.strip()) + ['<eos>']
                for word in words:
                    self.dictionary.add_word(word)

        # Tokenize file content
        with open(path, 'r', encoding="utf8") as f:
            idss = []
            for line in f:
                words = tokenizer.tokenize(line.strip()) + ['<eos>']
                ids = []
                for word in words:
                    ids.append(self.dictionary.word2idx[word])
                idss.append(torch.tensor(ids).type(torch.int64))
            ids = torch.cat(idss)

        return ids
        
subword_corpus = Corpus('./data/wikitext-2')
ntokens = len(subword_corpus.dictionary)
subwordmodel = RNNModel(args.model, ntokens, args.emsize, args.nhid, args.nlayers, args.dropout)

print(f"Word embedding parameter 개수: {count_parameters(subwordmodel.encoder)}")
print(f"RNN parameter 개수: {count_parameters(subwordmodel.rnn)}")

Word embedding parameter 개수: 4619000
RNN parameter 개수: 890880

이전에 비해 embedding parameter 개수는 확연히 줄어들었습니다.
6,655,600개 -> 4,619,000개
이렇듯 subword tokenization 기반 언어모델은 embedding parameter의 개수가 감소하여 overfitting을 방지할 수 있고 Out-of-Vocabulary 상황에 잘 대처할 수 있습니다.

아래는 이제 subword 기반의 언어 모델 구축하는 예제입니다.

Reference: Pytorch Language Model(https://github.com/pytorch/examples/tree/master/word_language_model)

###############################################################################
# Load data
###############################################################################

# Starting from sequential data, batchify arranges the dataset into columns.
# For instance, with the alphabet as the sequence and batch size 4, we'd get
# ┌ a g m s ┐
# │ b h n t │
# │ c i o u │
# │ d j p v │
# │ e k q w │
# └ f l r x ┘.
# These columns are treated as independent by the model, which means that the
# dependence of e. g. 'g' on 'f' can not be learned, but allows more efficient
# batch processing.

def batchify(data, bsz):
    # Work out how cleanly we can divide the dataset into bsz parts.
    nbatch = data.size(0) // bsz
    # Trim off any extra elements that wouldn't cleanly fit (remainders).
    data = data.narrow(0, 0, nbatch * bsz)
    # Evenly divide the data across the bsz batches.
    data = data.view(bsz, -1).t().contiguous()
    return data.to(device)

eval_batch_size = 10
train_data = batchify(subword_corpus.train, args.batch_size)
val_data = batchify(subword_corpus.valid, eval_batch_size)
test_data = batchify(subword_corpus.test, eval_batch_size)

###############################################################################
# Build the model
###############################################################################

model = RNNModel(args.model, ntokens, args.emsize, args.nhid, args.nlayers, args.dropout).to(device)
criterion = nn.NLLLoss()

###############################################################################
# Training code1 - define functions
###############################################################################

def repackage_hidden(h):
    """Wraps hidden states in new Tensors, to detach them from their history."""

    if isinstance(h, torch.Tensor):
        return h.detach()
    else:
        return tuple(repackage_hidden(v) for v in h)


# get_batch subdivides the source data into chunks of length args.bptt.
# If source is equal to the example output of the batchify function, with
# a bptt-limit of 2, we'd get the following two Variables for i = 0:
# ┌ a g m s ┐ ┌ b h n t ┐
# └ b h n t ┘ └ c i o u ┘
# Note that despite the name of the function, the subdivison of data is not
# done along the batch dimension (i.e. dimension 1), since that was handled
# by the batchify function. The chunks are along dimension 0, corresponding
# to the seq_len dimension in the LSTM.

def get_batch(source, i):
    seq_len = min(args.bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len].view(-1)
    return data, target


def evaluate(data_source):
    # Turn on evaluation mode which disables dropout.
    model.eval()
    total_loss = 0.
    ntokens = len(subword_corpus.dictionary)
    hidden = model.init_hidden(eval_batch_size)
    with torch.no_grad():
        for i in range(0, data_source.size(0) - 1, args.bptt):
            data, targets = get_batch(data_source, i)
            output, hidden = model(data, hidden)
            hidden = repackage_hidden(hidden)
            total_loss += len(data) * criterion(output, targets).item()
    return total_loss / (len(data_source) - 1)


def train():
    # Turn on training mode which enables dropout.
    model.train()
    total_loss = 0.
    start_time = time.time()
    ntokens = len(subword_corpus.dictionary)
    hidden = model.init_hidden(args.batch_size)
    for batch, i in enumerate(range(0, train_data.size(0) - 1, args.bptt)):
        data, targets = get_batch(train_data, i)
        # Starting each batch, we detach the hidden state from how it was previously produced.
        # If we didn't, the model would try backpropagating all the way to start of the dataset.
        model.zero_grad()

        hidden = repackage_hidden(hidden)
        output, hidden = model(data, hidden)

        loss = criterion(output, targets)
        loss.backward()

        # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
        torch.nn.utils.clip_grad_norm_(model.parameters(), args.clip)
        for p in model.parameters():
            p.data.add_(p.grad, alpha=-lr)

        total_loss += loss.item()

        if batch % args.log_interval == 0 and batch > 0:
            cur_loss = total_loss / args.log_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.2f} | ms/batch {:5.2f} | '
                    'loss {:5.2f} | ppl {:8.2f}'.format(
                epoch, batch, len(train_data) // args.bptt, lr,
                elapsed * 1000 / args.log_interval, cur_loss, math.exp(cur_loss)))
            total_loss = 0
            start_time = time.time()
        if args.dry_run:
            break
            
###############################################################################
# Training code2 - run 
###############################################################################

# Loop over epochs.
lr = args.lr
best_val_loss = None

# At any point you can hit Ctrl + C to break out of training early.
try:
    for epoch in range(1, args.epochs+1):
        epoch_start_time = time.time()
        train()
        val_loss = evaluate(val_data)
        print('-' * 89)
        print('| end of epoch {:3d} | time: {:5.2f}s | valid loss {:5.2f} | '
                'valid ppl {:8.2f}'.format(epoch, (time.time() - epoch_start_time),
                                           val_loss, math.exp(val_loss)))
        print('-' * 89)
        # Save the model if the validation loss is the best we've seen so far.
        if not best_val_loss or val_loss < best_val_loss:
            with open(args.save, 'wb') as f:
                torch.save(model, f)
            best_val_loss = val_loss
        else:
            # Anneal the learning rate if no improvement has been seen in the validation dataset.
            lr /= 4.0
except KeyboardInterrupt:
    print('-' * 89)
    print('Exiting from training early')

# Load the best saved model.
with open(args.save, 'rb') as f:
    model = torch.load(f)
    # after load the rnn params are not a continuous chunk of memory
    # this makes them a continuous chunk, and will speed up forward pass
    # Currently, only rnn model supports flatten_parameters function.
    if args.model in ['RNN_TANH', 'RNN_RELU', 'LSTM', 'GRU']:
        model.rnn.flatten_parameters()

# Run on test data.
test_loss = evaluate(test_data)
print('=' * 89)
print('| End of training | test loss {:5.2f} | test ppl {:8.2f}'.format(
    test_loss, math.exp(test_loss)))
print('=' * 89)

학습한 언어 모델로 텍스트 생성

###############################################################################
# Language Modeling on Wikitext-2
#
# This file generates new sentences sampled from the language model
#
###############################################################################

import torch

# Model parameters.
test_args = easydict.EasyDict({
    "data"      : './data/wikitext-2',  # location of data corpus
    "checkpoint": './model.pt',         # model checkpoint to use
    "outf"      : 'generate_.txt',       # output file for generated text
    "words"     : 1000,                 # number of words to generate
    "seed"      : 1111,                 # random seed
    "cuda"      : True,                 # use CUDA
    "temperature": 1.0,                 # temperature - higher will increase diversity
    "log_interval": 100                 # reporting interval
})

# Set the random seed manually for reproducibility.
torch.manual_seed(test_args.seed)
if torch.cuda.is_available():
    if not test_args.cuda:
        print("WARNING: You have a CUDA device, so you should probably run with --cuda")

device = torch.device("cuda" if test_args.cuda else "cpu")

if test_args.temperature < 1e-3:
    parser.error("--temperature has to be greater or equal 1e-3")

with open(test_args.checkpoint, 'rb') as f:
    model = torch.load(f).to(device)
model.eval()

# corpus = Corpus(test_args.data)
# ntokens = len(subword_corpus.dictionary)

hidden = model.init_hidden(1)
input = torch.randint(ntokens, (1, 1), dtype=torch.long).to(device)

with open(test_args.outf, 'w') as outf:
    with torch.no_grad():  # no tracking history
        for i in range(test_args.words):
            output, hidden = model(input, hidden)
            word_weights = output.squeeze().div(test_args.temperature).exp().cpu()
            word_idx = torch.multinomial(word_weights, 1)[0]
            input.fill_(word_idx)

            word = subword_corpus.dictionary.idx2word[word_idx]

            outf.write(word + ('\n' if i % 20 == 19 else ' '))

            if i % test_args.log_interval == 0:
                print('| Generated {}/{} words'.format(i, test_args.words))
profile
멋진 챗봇을 만들꺼야

0개의 댓글