트랜스포머(Transformer)

짬그브·2025년 4월 24일

참고자료 : https://wikidocs.net/166787

어텐션(Attention)

어텐션의 기본 아이디어는 디코더에서 출력 단어를 예측하는 매 시점(time step)마다, 인코더에서의 전체 입력 문장을 다시 한 번 참고한다는 점이다.

단, 전체 입력 문장을 전부 다 동일한 비율로 참고하는 것이 아니라, 해당 시점에서 예측해야 할 단어와 연관이 있는 입력 단어 부분을 좀 더 집중(attention)해서 보게 한다.

Key-value 함수

어텐션 메커니즘을 이해하기 위해서는 Key-Value 기법을 이해해야 한다.

딕셔너리 자료형은 키(Key)와 값(Value)이라는 두 개의 쌍으로 구성되는데, 키를 통해서 맵핑된 값을 찾아낼 수 있다는 특징을 갖고 있다.

Attention(Q,K,V) = Attention Value

Q = Query:t 시점의 디코더 셀에서의 은닉 상태
K = Keys: 모든 시점의 인코더 셀의 은닉 상태들
V = Values: 모든 시점의 인코더 셀의 은닉 상태들

어텐션 함수는 주어진 '쿼리(Query)' 에 대해서 모든 '키(Key)'와의 유사도를 각각 구한다. 그리고 구해낸 이 유사도를 키와 맵핑이 되어있는 각각의 '값(Value)'에 반영한다. 그리고 유사도가 반영된 '값(Value)'을 모두 더해서 리턴한다. 이를 어텐션 값(Attention Value)이라고 한다.

파이프라인(pipeline)활용하기

Transformers 라이브러리의 가장 기본적인 객체는 pipeline()함수입니다.

이 함수는 특정 모델과 동작에 필요한 전처리 및 후처리 단계를 연결하여 텍스트를 직접 입력하고 이해하기 쉬운 답변을 얻을 수 있습니다.

토크나이저(Tokenizer)

토크나이저는 NLP 파이프라인의 핵심 구성 요소 중 하나입니다. 토크나이저는 단지 1가지 목적을 가지고 있습니다. 즉, 입력된 텍스트를 모델에서 처리할 수 있는 데이터로 변환하는 것입니다.
모델은 숫자만 처리할 수 있으므로, 토크나이저는 텍스트 입력을 숫자 데이터로 변환해야 합니다.
이 섹션에서는 토큰화 파이프라인(tokenization pipeline)에서 정확히 어떤 일이 발생하는지 살펴보겠습니다.

NLP 작업에서 일반적으로 처리되는 데이터는 원시 텍스트(raw text)입니다. 다음은 원시 텍스트의 예시입니다.

Jim Henson was a puppeteer

그러나 모델은 숫자만 처리할 수 있으므로 원시 텍스트를 숫자로 변환하는 방법을 찾아야 합니다. 이것이 토크나이저가 하는 일이며, 이를 위해서 다양한 방버빙 존재합니다. 토크나이저의 목표는 가장 의미 있는 표현 (meaningful representation), 즉 모델에 가장 적합하면서 최대한 간결한 표현을 찾는 것입니다.

모델 사용하는 사이트 (hugging face.co)

https://huggingface.co/

transformer document

https://huggingface.co/docs/transformers/index

추가설치

Hugging face 예제

import math
import torch
import torch.nn as nn
import torch.nn.functional as F



def get_device():
    device = ''
    if torch.cuda.is_available():
        device = 'cuda'
    elif torch.backends.mps.is_available():
        device = 'mps'
    else:
        device = 'cpu'
    return device

device = get_device()
print(device)
'''
cuda
'''

with open('data/tokenizer_train.txt','r') as file:
    dataset = [line.strip() for line in file.readlines()]
print(dataset)
'''
['It open-source library offering a wide-range of pre-trained models (over 190).', 'It has over 24K ready-to-use datasets for ML.', 'Flexible library to handle pre-processing and tokenization of text data for NLP', 'It supports integration with multiple frameworks (PyTorch, TensorFlow, and JAX/Flax). PyTorch is supported by all models. Other frameworks have limited support.', 'Enable user to deploy model from HuggingFace with just few lines of code. Overall, allows production-ready model deployment for small to large-scale project', 'Users can build web applications, host demos, and collaborate with the community in a user-friendly environment.']
'''

from tokenizers import Tokenizer, models, pre_tokenizers, trainers

tokenizer = Tokenizer(models.BPE()) # binary pair encoding
tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()

trainer = trainers.BpeTrainer(special_tokens=["[UNK]","[CLS]","[SEP]","[PAD]","[MASK]"])
tokenizer.train_from_iterator(dataset, trainer=trainer)

tokenizer.save('tokenizer.json')

from transformers import PreTrainedTokenizerFast
fast_tokenizer = PreTrainedTokenizerFast(
    tokenizer_file = ('tokenizer.json')
)

text = 'The tokenizer words'
encoded = tokenizer.encode(text)

print(encoded)
'''
Encoding(num_tokens=10, attributes=[ids, type_ids, tokens, offsets, attention_mask, special_tokens_mask, overflowing])
'''
print(encoded.tokens)
'''
['T', 'h', 'e', 'token', 'iz', 'er', 'w', 'or', 'd', 's']
'''
from transformers import BertTokenizer

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
print(tokenizer.tokenize('The tokenize word'))
'''
['the', 'token', '##ize', 'word']
'''

from datasets import load_dataset
from transformers import AutoTokenizer

imdb_dataset = load_dataset('imdb')
print(imdb_dataset)
'''
DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 25000
    })
    test: Dataset({
        features: ['text', 'label'],
        num_rows: 25000
    })
    unsupervised: Dataset({
        features: ['text', 'label'],
        num_rows: 50000
    })
})

'''
print(imdb_dataset['test'])
'''
Dataset({
    features: ['text', 'label'],
    num_rows: 25000
})
'''
sample_text = imdb_dataset['test'][0]['text']
print()
print(sample_text)
'''
I love sci-fi and am willing to put up with a lot. Sci-fi movies/TV are usually underfunded, under-appreciated and misunderstood. I tried to like this, I really did, but it is to good TV sci-fi as Babylon 5 is to Star Trek (the original). Silly prosthetics, cheap cardboard sets, stilted dialogues, CG that doesn't match the background, and painfully one-dimensional characters cannot be overcome with a 'sci-fi' setting. (I'm sure there are those of you out there who think Babylon 5 is good sci-fi TV. It's not. It's clichéd and uninspiring.) While US viewers might like emotion and character development, sci-fi is a genre that does not take itself seriously (cf. Star Trek). It may treat important issues, yet not as a serious philosophy. It's really difficult to care about the characters here as they are not simply foolish, just missing a spark of life. Their actions and reactions are wooden and predictable, often painful to watch. The makers of Earth KNOW it's rubbish as they have to always say "Gene Roddenberry's Earth..." otherwise people would not continue watching. Roddenberry's ashes must be turning in their orbit as this dull, cheap, poorly edited (watching it without advert breaks really brings this home) trudging Trabant of a show lumbers into space. Spoiler. So, kill off a main character. And then bring him back as another actor. Jeeez! Dallas all over again.
'''
model_name = 'distilbert/distilbert-base-uncased-finetuned-sst-2-english'
tokenizer = AutoTokenizer.from_pretrained(model_name)

from transformers import AutoModelForSequenceClassification, pipeline
model = AutoModelForSequenceClassification.from_pretrained(model_name)
sentiment_analysis_pipeline = pipeline('sentiment-analysis', model=model, tokenizer=tokenizer)

result = sentiment_analysis_pipeline(sample_text)
print('sample text: ', sample_text)
'''
sample text:  I love sci-fi and am willing to put up with a lot. Sci-fi movies/TV are usually underfunded, under-appreciated and misunderstood. I tried to like this, I really did, but it is to good TV sci-fi as Babylon 5 is to Star Trek (the original). Silly prosthetics, cheap cardboard sets, stilted dialogues, CG that doesn't match the background, and painfully one-dimensional characters cannot be overcome with a 'sci-fi' setting. (I'm sure there are those of you out there who think Babylon 5 is good sci-fi TV. It's not. It's clichéd and uninspiring.) While US viewers might like emotion and character development, sci-fi is a genre that does not take itself seriously (cf. Star Trek). It may treat important issues, yet not as a serious philosophy. It's really difficult to care about the characters here as they are not simply foolish, just missing a spark of life. Their actions and reactions are wooden and predictable, often painful to watch. The makers of Earth KNOW it's rubbish as they have to always say "Gene Roddenberry's Earth..." otherwise people would not continue watching. Roddenberry's ashes must be turning in their orbit as this dull, cheap, poorly edited (watching it without advert breaks really brings this home) trudging Trabant of a show lumbers into space. Spoiler. So, kill off a main character. And then bring him back as another actor. Jeeez! Dallas all over again.
'''
print('sentiment analysis result : ', result)
'''
sentiment analysis result :  [{'label': 'NEGATIVE', 'score': 0.999616265296936}]
'''

from torch.utils.data import Dataset
import torch

def preprocess(data):
    dataset = []
    for example in data:
        text = example['text'].lower()
        label = example['label']
        dataset.append({'text':text, 'label':label})

    return dataset

train_data = preprocess(imdb_dataset['train'])

class CustomDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

train_dataset = CustomDataset(train_data)
print(train_dataset[0])
'''
{'text': 'i rented i am curious-yellow from my video store because of all the controversy that surrounded it when it was first released in 1967. i also heard that at first it was seized by u.s. customs if it ever tried to enter this country, therefore being a fan of films considered "controversial" i really had to see this for myself.<br /><br />the plot is centered around a young swedish drama student named lena who wants to learn everything she can about life. in particular she wants to focus her attentions to making some sort of documentary on what the average swede thought about certain political issues such as the vietnam war and race issues in the united states. in between asking politicians and ordinary denizens of stockholm about their opinions on politics, she has sex with her drama teacher, classmates, and married men.<br /><br />what kills me about i am curious-yellow is that 40 years ago, this was considered pornographic. really, the sex and nudity scenes are few and far between, even then it\'s not shot like some cheaply made porno. while my countrymen mind find it shocking, in reality sex and nudity are a major staple in swedish cinema. even ingmar bergman, arguably their answer to good old boy john ford, had sex scenes in his films.<br /><br />i do commend the filmmakers for the fact that any sex shown in the film is shown for artistic purposes rather than just to shock people and make money to be shown in pornographic theaters in america. i am curious-yellow is a good film for anyone wanting to study the meat and potatoes (no pun intended) of swedish cinema. but really, this film doesn\'t have much of a plot.', 'label': 0}
'''

transformer 예제 1

import math
import torch
import torch.nn as nn
import torch.nn.functional as F

def get_device():
  device="cpu"
  if torch.cuda.is_available():
    device="cuda"
  else:
    device="cpu"
  return device


device = get_device()
print(device)



class PositionalEncoding(nn.Module):
    def __init__(self, dim_embedding, dropout=0.1, max_seq_len=5000):
        super(PositionalEncoding, self).__init__()

        self.dropout = nn.Dropout(p=dropout)
        positional_encoding = torch.zeros(max_seq_len, dim_embedding)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        denom_term = torch.exp(torch.arange(0, dim_embedding, 2).float() * (-math.log(10000.0) / dim_embedding))
        positional_encoding[:,0::2] = torch.sin(position * denom_term)
        positional_encoding[:,1::2] = torch.cos(position * denom_term)
        positional_encoding = positional_encoding.unsqueeze(0).transpose(0,1)
        self.register_buffer('positional_encoding', positional_encoding) # positional encoding 은 학습할 필요는 없지만,

    def forward(self, x):
        x = x + self.positional_encoding[:x.size(0), :]
        return self.dropout(x)

from datasets import load_dataset
from transformers import AutoTokenizer
from torch.utils.data import TensorDataset, DataLoader
import torch

# 데이터셋과 토크나이저 불러오기
dataset = load_dataset("imdb")
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

# 데이터셋 토큰화
def tokenize(batch):
    return tokenizer(batch["text"], padding=True, truncation=True, return_tensors="pt", max_length=512)

train_dataset = dataset["train"].map(tokenize, batched=True, batch_size=len(dataset["train"]))
val_dataset = dataset["test"].map(tokenize, batched=True, batch_size=len(dataset["test"]))

# 토큰화된 데이터셋에서 input_ids와 attention_mask 추출
train_data = torch.tensor(train_dataset["input_ids"])
train_attention_mask = torch.tensor(train_dataset["attention_mask"])
train_labels = torch.tensor(train_dataset["label"])

val_data = torch.tensor(val_dataset["input_ids"])
val_attention_mask = torch.tensor(val_dataset["attention_mask"])
val_labels = torch.tensor(val_dataset["label"])

# TensorDataset 생성
train_dataset = TensorDataset(train_data, train_attention_mask, train_labels)
val_dataset = TensorDataset(val_data, val_attention_mask, val_labels)

# DataLoader 생성
def collate_fn(batch):
    input_ids, attention_mask, labels = zip(*batch)
    input_ids = torch.stack(input_ids).transpose(0,1)
    attention_mask = torch.stack(attention_mask)
    labels = torch.nn.functional.one_hot(torch.tensor(labels), num_classes=2).float().to(device)
    return input_ids, attention_mask, labels

'''
왜 input_ids만 transpose하나요?
attention_mask나 labels와 같은 다른 텐서들은 보통 (batch_size, seq_len) 형태로 유지됩니다. 
attention_mask는 각 토큰이 패딩인지 아닌지를 나타내는 마스크이기 때문에 시퀀스 길이가 중요하고, 
labels는 주로 분류 문제에서 한 번에 하나의 라벨을 가집니다. 이들에 대해서는 차원 순서를 바꿀 필요가 없습니다.

따라서 input_ids만 transpose한 이유는 Transformer 모델의 입력 형식에 맞추기 위함입니다.
'''

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)  # collate_fn : batch 형식 변경용으로 사용
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)


class TextClassifier(nn.Module):
    def __init__(
        self, vocab_size, embedding_dim, nhead, num_layers, num_classes):
        super(TextClassifier, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim)
        # 트랜스포머 인코더 층 생성
        self.encoder_layer = nn.TransformerEncoderLayer(
            embedding_dim, nhead)
        self.encoder = nn.TransformerEncoder(
            self.encoder_layer, num_layers)
        self.fc = nn.Linear(embedding_dim, num_classes)
        self.embedding_dim = embedding_dim
        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        for layer in self.encoder.layers:
            nn.init.xavier_uniform_(layer.self_attn.out_proj.weight)
            nn.init.zeros_(layer.self_attn.out_proj.bias)
            nn.init.xavier_uniform_(layer.linear1.weight)
            nn.init.zeros_(layer.linear1.bias)
            nn.init.xavier_uniform_(layer.linear2.weight)
            nn.init.zeros_(layer.linear2.bias)
        self.fc.bias.data.zero_()
        self.fc.weight.data.uniform_(-initrange, initrange)

    def forward(self, x, key_padding_mask=None):
        '''
        x는 입력된 토큰 인덱스를 임베딩 벡터로 변환한 후, 임베딩 차원의 제곱근을 곰해 줍니다. 이는 임베딩 크기에 의한 값의 스케일 차이
        '''
        x = self.embedding(x)* math.sqrt(self.embedding_dim)
        x = self.positional_encoding(x)
        x = self.encoder(x, src_key_padding_mask=key_padding_mask) # src_key_padding_mask는 패딩 마스크로, 시퀀스에 포함된다

        # 첫 번째 차원을 기준으로 나머지 차원(마지막 차원) 값의 평균값 생성
        '''
        트랜스포머 인코더의 출력은 시퀀스 차원에 대한 정보가 담겨 있습니다. mean(dim=0)을 사용하여 시퀀스 차원에 대해 평균을 구합니다.
        이렇게 하면 시퀀스의 전체적인 의미를 추출할 수 있씁니다.

        '''
        x = x.mean(dim=0)

        # 분류 작업용 완전 연결 층
        x = self.fc(x)
        x = torch.sigmoid(x)
        return x

import torch.optim as optim  # optim 모듈 임포트 추가
import torch.nn as nn


vocab_size = tokenizer.vocab_size
embedding_dim = 512
nhead = 8
num_layers = 6
num_classes = 2

# 모델 생성

model = TextClassifier(vocab_size, embedding_dim, nhead, num_layers, num_classes).to(device)
criterion = nn.BCELoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)

#
# # 런타임 9분~11분 소요
# num_epochs = 10
# for epoch in range(num_epochs):
#     i=0
#     for batch_data, batch_attention_mask, batch_labels in train_dataloader:
#
#         optimizer.zero_grad()
#
#         # attention_mask 를 불리언(boolean) 텐서로 변환
#         batch_attention_mask = (batch_attention_mask==0).to(device)
#
#         outputs = model(batch_data.to(device), key_padding_mask=batch_attention_mask)
#         loss = criterion(outputs, batch_labels.to(device))
#         if i % 100 == 0:
#             print("epoch ", epoch, "batch ", i, "loss", loss)
#
#         loss.backward()
#         torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
#         optimizer.step()
#         i = i+1
#
#     print(f"Epoch: {epoch + 1}, Loss: {loss.item()}")
#
#
# torch.save(model.state_dict(), "model_save/TextClassificationModel.pth")
# print()

# 토크나이저 초기화
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
vocab_size = tokenizer.vocab_size
embedding_dim = 512
nhead = 8
num_layers = 6 #원서 코드에서는 3으로 잘못 기재되어 고침
num_classes = 2

# 모델 생성
model_loaded = TextClassifier(vocab_size, embedding_dim, nhead, num_layers,  num_classes).to(device)

# 학습 모델 가중치(weights) 불러오기
model_loaded.load_state_dict(torch.load('model_save/TextClassificationModel.pth'), strict=False)
model_loaded.eval()
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Total trainable parameters:", total_params)
'''
Total trainable parameters: 37694978
'''
# 주어진 텍스트에서 추론을 실행하는 함수
def infer(text):
    # 입력 테스트 토큰화
    tokens = tokenizer.encode_plus(text, padding=True, truncation=True, return_tensors='pt', max_length=512)
    input_ids = tokens["input_ids"].to(device).transpose(0,1)

    attention_mask = tokens["attention_mask"]
    attention_mask = (attention_mask==0).to(device)
    print(input_ids.shape)
    '''
    torch.Size([8, 1])
    '''
    print(attention_mask)
    '''
    tensor([[False, False, False, False, False, False, False, False]],
       device='cuda:0')
    '''

    # 추론 실행
    with torch.no_grad():
        output = model_loaded(input_ids, key_padding_mask=attention_mask)

    # 출력을 클래스 확률로 변환
    probabilities = output.squeeze(0)
    return probabilities

# 샘플 텍스트로 테스트
example_text = "This movie is good! ."
probabilities = infer(example_text)

print("Probabilities: ", probabilities)
'''
Probabilities:  tensor([0.5185, 0.4815], device='cuda:0')
'''

transformer 예제 2

import math
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForCausalLM

def get_device():
  device="cpu"
  if torch.cuda.is_available():
    device="cuda"
  else:
    device="cpu"
  return device


device = get_device()
print(device)

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

class ShakespeareDataset(Dataset):
    def __init__(self, file_path, tokenizer, block_size=128):
        self.block_size = block_size
        self.tokenizer = tokenizer

        with open(file_path, 'r') as f:
            self.data = f.read()

        self.examples = []

        '''
        padding='max_length': 항상 max_length 길이로 맞추기 위해 필요 없는 부분은 [PAD] 토큰으로 채워요.

        truncation=True: 만약 example이 max_length보다 길면 자르도록 해요.
        
        max_length=block_size: 길이를 block_size에 맞춰요.
        
        return_tensors='pt': 결과를 파이토치 텐서로 받아요. (input_ids, attention_mask 등이 텐서 형태로 나옴)
        '''
        for i in range(0, len(self.data)-self.block_size, self.block_size):
            example = self.data[i:i+self.block_size]
            tokenized = self.tokenizer(example, padding='max_length', truncation=True, max_length=block_size, return_tensors='pt')
            self.examples.append(tokenized)

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        input_ids = self.examples[idx]['input_ids'].squeeze()
        attention_mask = self.examples[idx]['attention_mask'].squeeze()
        return input_ids, attention_mask

filename = 'data/input.txt'
train_dataset = ShakespeareDataset(filename, tokenizer)

from torch.nn.utils.rnn import pad_sequence
def collate_fn(batch):
    inputs, masks = zip(*batch)
    inputs = torch.stack(inputs).transpose(0, 1)
    masks = torch.stack(masks)
    return inputs, masks
train_dataloader = DataLoader(train_dataset, batch_size=4, collate_fn=collate_fn,shuffle=True)

# 데이터의 차원 확인
item=next(iter(train_dataloader))
input_ids,attention_masks=item
print(input_ids.shape, attention_masks.shape)

class PositionalEncoding(nn.Module):
    def __init__(self, dim_embedding, dropout=0.1, max_seq_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        postional_encoding = torch.zeros(max_seq_len, dim_embedding)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        denom_term = torch.exp(torch.arange(0, dim_embedding, 2).float() * (-math.log(10000.0) / dim_embedding))
        postional_encoding[:, 0::2] = torch.sin(position * denom_term)
        postional_encoding[:, 1::2] = torch.cos(position * denom_term)
        postional_encoding = postional_encoding.unsqueeze(0).transpose(0, 1)
        self.register_buffer('postional_encoding', postional_encoding)
    def forward(self, x):
        x = x + self.postional_encoding[:x.size(0), :]
        return self.dropout(x)


class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_layers, dropout):
        super().__init__()

        self.memory_embedding = nn.Embedding(vocab_size, embedding_dim)
        self.memory_pos_encoder = PositionalEncoding(embedding_dim, dropout)
        self.tgt_embedding = nn.Embedding(vocab_size, embedding_dim)
        self.tgt_pos_encoder = PositionalEncoding(embedding_dim, dropout)
        self.decoder = nn.TransformerDecoder(
            nn.TransformerDecoderLayer(d_model=embedding_dim, nhead=8,
                                       dim_feedforward=2048,
                                       dropout=dropout),
            num_layers=num_layers)

        self.fc = nn.Linear(embedding_dim, vocab_size)
        self.d_model=embedding_dim
        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1

        # 임베딩 층 초기화
        nn.init.uniform_(self.memory_embedding.weight, -initrange, initrange)
        nn.init.uniform_(self.tgt_embedding.weight, -initrange, initrange)

        # 디코딩 층 초기화
        for param in self.decoder.parameters():
            if param.dim() > 1:
                nn.init.xavier_uniform_(param)

        # 출력 층 초기화
        nn.init.uniform_(self.fc.weight, -initrange, initrange)
        nn.init.zeros_(self.fc.bias)

    def forward(self, tgt,  memory=None, tgt_mask=None, memory_mask=None, memory_key_padding_mask=None,tgt_key_padding_mask=None):
        tgt = self.tgt_embedding(tgt) * self.d_model ** 0.5 #scaling
        tgt=self.tgt_pos_encoder(tgt)

        memory=self.memory_embedding(memory) * self.d_model ** 0.5 # memory : encoder output
        memory=self.memory_pos_encoder(memory)

        output = self.decoder(
            tgt=tgt, memory=memory, tgt_mask=tgt_mask,
            memory_mask=memory_mask,#디코더가 인코더의 출력(memory) 중 특정 위치를 보지 못하게 막는 역할.
            memory_key_padding_mask=memory_key_padding_mask,#인코더 입력에서 PAD 토큰에 해당하는 부분을 디코더가 무시하도록 만듬
            tgt_key_padding_mask=tgt_key_padding_mask #디코더 입력(tgt) 중에서 패딩(<PAD>)이 있는 위치를 무시
            )
        print(output)
        output = self.fc(output)
        return output

def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=device)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask.to(device)


def create_mask(src, tgt,tokenizer_src=tokenizer,tokenizer_tgt=tokenizer):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len),device=device).type(torch.bool)

    src_padding_mask = (src == tokenizer_src.pad_token_id).transpose(0, 1)
    tgt_padding_mask = (tgt == tokenizer_tgt.pad_token_id).transpose(0, 1)
    return src_mask.to(device), tgt_mask.to(device), src_padding_mask.to(device), tgt_padding_mask.to(device)


model = TransformerDecoder(vocab_size=tokenizer.vocab_size, embedding_dim=768, num_layers=3, dropout=0.1)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from transformers import AutoTokenizer
from datasets import load_dataset

# 데이터셋과 토크나이저 불러오기
dataset = load_dataset("iwslt2017", "iwslt2017-de-en", split="train[:1%]", trust_remote_code=True)  # Only use a 1% portion of the dataset
tokenizer_src = AutoTokenizer.from_pretrained("bert-base-uncased")
tokenizer_tgt = AutoTokenizer.from_pretrained("bert-base-german-cased")
print(dataset[0])

class TranslationDataset(Dataset):
    def __init__(self, dataset, tokenizer_src, tokenizer_tgt, max_length=50):
        self.dataset = dataset
        self.tokenizer_src = tokenizer_src
        self.tokenizer_tgt = tokenizer_tgt
        self.max_length = max_length

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        src_text = self.dataset[idx]['translation']['en']
        tgt_text = self.dataset[idx]['translation']['de']

        src_tokens = self.tokenizer_src.encode_plus(
            src_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        tgt_tokens = self.tokenizer_tgt.encode_plus(
            tgt_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        return src_tokens["input_ids"].squeeze(),tgt_tokens["input_ids"].squeeze()

train_data = TranslationDataset(dataset, tokenizer_src, tokenizer_tgt)
# torch.tensor 생성 후 샘플 데이터 확인
print(train_data[2])

def collate_fn(batch):
    src_ids ,tgt_ids = zip(*batch)
    src_ids = torch.stack(src_ids).transpose(0, 1)
    tgt_ids = torch.stack(tgt_ids).transpose(0, 1)
    return src_ids, tgt_ids
dataloader = DataLoader(train_data, batch_size=16, shuffle=True, collate_fn=collate_fn)

item=next(iter(dataloader))
src_ids,tgt_ids=item
print('src_ids ',src_ids.shape)
print(' tgt_ids ',tgt_ids.shape)


transformer 예제 3

import math
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer
from datasets import load_dataset

# GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Tokenizer 설정 (특수 토큰 추가)
##################################################################################################################################################
tokenizer_src = AutoTokenizer.from_pretrained("bert-base-uncased")
tokenizer_tgt = AutoTokenizer.from_pretrained("bert-base-german-cased")

tokenizer_src.add_special_tokens({'pad_token': '[PAD]'})
tokenizer_tgt.add_special_tokens({'pad_token': '[PAD]'})
##################################################################################################################################################

#International Workshop on Spoken Language Translation
#IWSLT 2017은 독일어(Deutsch) → 영어(English) 번역을 위한 데이터셋
# 데이터셋 불러오기 (1% 샘플)
##################################################################################################################################################
dataset = load_dataset("iwslt2017", "iwslt2017-de-en",split="train[:1%]", trust_remote_code=True)

##################################################################################################################################################

# 커스텀 Dataset
class TranslationDataset(Dataset):
    def __init__(self, dataset, tokenizer_src, tokenizer_tgt, max_length=50):
    ##################################################################################################################################################
        self.dataset = dataset
        self.tokenizer_src = tokenizer_src
        self.tokenizer_tgt = tokenizer_tgt
        self.max_length = max_length
    ##################################################################################################################################################

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
    ##################################################################################################################################################
        src_text = self.dataset[idx]['translation']['en']
        tgt_text = self.dataset[idx]['translation']['de']

        src = self.tokenizer_src.encode(src_text, padding='max_length', truncation=True, max_length=self.max_length)
        tgt = self.tokenizer_tgt.encode(tgt_text, padding='max_length', truncation=True, max_length=self.max_length)

        return torch.tensor(src), torch.tensor(tgt)

    ##################################################################################################################################################

##################################################################################################################################################
train_data = TranslationDataset(dataset, tokenizer_src, tokenizer_tgt)
##################################################################################################################################################

# Collate 함수
def collate_fn(batch):
##################################################################################################################################################
    src, tgt = zip(*batch)
    src = torch.stack(src).transpose(0,1) # (seq_len, batch)
    tgt = torch.stack(tgt).transpose(0,1)
    return src, tgt
##################################################################################################################################################

dataloader = DataLoader(train_data, batch_size=16, shuffle=True, collate_fn=collate_fn)

# PositionalEncoding 정의
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

# Transformer Model
class TransformerModel(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, nhead=8, num_encoder_layers=3, num_decoder_layers=3, dim_feedforward=1024, dropout=0.1):
        super().__init__()
        ##################################################################################################################################################
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)

        self.transformer = nn.Transformer(
            d_model = d_model, nhead = nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout
        )

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.d_model = d_model
        ##################################################################################################################################################

    def forward(self, src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask):
    ##################################################################################################################################################
        src = self.src_embedding(src) * math.sqrt(self.d_model)
        tgt = self.tgt_embedding(tgt) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        tgt = self.pos_encoder(tgt)

        output = self.transformer(
            src, tgt, src_mask=src_mask, tgt_mask=tgt_mask,
            src_key_padding_mask = src_padding_mask,
            tgt_key_padding_mask = tgt_padding_mask,
            memory_key_padding_mask = src_padding_mask
        )
        return self.fc(output)

    ##################################################################################################################################################

# 마스크 생성 함수
def generate_square_subsequent_mask(sz):
    '''
    sz x sz 크기의 행렬 생성
    상삼각 행렬의 윗부분을 -inf로 채움 (triu)
    나머지는 0
    이 마스크는 현재 시점 이후의 토큰을 가려서 모델이 미래 정보를 보지 못하게 만듦
    예시 (sz=3):
    [[0,   -inf, -inf],
     [0,    0,   -inf],
     [0,    0,     0 ]]

    '''
    ##################################################################################################################################################
    return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1).to(device)
    ##################################################################################################################################################

def create_mask(src, tgt, pad_id_src, pad_id_tgt):
    ##################################################################################################################################################
    src_seq_len, tgt_seq_len = src.size(0), tgt.size(0)
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=device).type(torch.bool)
    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_padding_mask = (src == pad_id_src).transpose(0,1)
    tgt_padding_mask = (tgt == pad_id_tgt).transpose(0,1)

    ##################################################################################################################################################
    '''
    src = tensor([
    [2, 3, 4, 1],   # 문장 1 (1은 pad 토큰)
    [5, 6, 1, 1]    # 문장 2
    ])  # shape: (batch_size=2, seq_len=4)
    src == pad_id_src
    
    tensor([
    [False, False, False,  True],
    [False, False,  True,  True]
    ])
    '''
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

# 모델 생성
model = TransformerModel(
    ##################################################################################################################################################
    src_vocab_size = len(tokenizer_src),
    tgt_vocab_size = len(tokenizer_tgt),
    d_model=512
    ##################################################################################################################################################
).to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-4)
##################################################################################################################################################
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer_tgt.pad_token_id)
##################################################################################################################################################

# # 학습 루프
# num_epochs = 500
# model.train()
# for epoch in range(num_epochs):
#     for i, (src, tgt) in enumerate(dataloader):
#         ##################################################################################################################################################
#         src = src.to(device)
#         tgt = tgt.to(device)
#
#         tgt_input = tgt[:-1, :]
#         tgt_out = tgt[1:, :]
#
#         src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(
#             src, tgt_input, tokenizer_src.pad_token_id, tokenizer_tgt.pad_token_id
#         )
#
#         preds = model(src, tgt_input, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask)
#
#         optimizer.zero_grad()
#         loss = criterion(preds.reshape(-1, preds.shape[-1]), tgt_out.reshape(-1))
#         loss.backward()
#         optimizer.step()
#
#         ##################################################################################################################################################
#
#         if i % 10 == 0:
#             print(f"Epoch {epoch}, Step {i}, Loss: {loss.item():.4f}")
#
# #  모델 저장하기
# torch.save(model.state_dict(), "model_save/transformer_translation.pth")

#  저장한 모델 불러오기
model_loaded = TransformerModel(
    src_vocab_size=len(tokenizer_src),
    tgt_vocab_size=len(tokenizer_tgt),
    d_model=512
).to(device)
model_loaded.load_state_dict(torch.load("model_save/transformer_translation.pth"))
model_loaded.eval()

def translate_sentence(model, sentence, tokenizer_src, tokenizer_tgt, max_len=50):
    model.eval()
    src = tokenizer_src.encode(sentence, return_tensors="pt", truncation=True, max_length=max_len, padding="max_length").to(device)
    src = src.transpose(0,1) # (seq_len, 1)

    src_mask = torch.zeros((src.size(0), src.size(0)), device=device).type(torch.bool)
    src_padding_mask = (src == tokenizer_src.pad_token_id).transpose(0,1)

    memory = model.transformer.encoder(
        model.pos_encoder(model.src_embedding(src) * math.sqrt(model.d_model)),
        mask = src_mask,
        src_key_padding_mask = src_padding_mask
    )

    ys = torch.ones(1,1).fill_(tokenizer_tgt.cls_token_id if tokenizer_tgt.cls_token_id else tokenizer_tgt.bos_token_id).type(torch.long).to(device)

    for i in range(max_len - 1):
        tgt_mask = generate_square_subsequent_mask(ys.size(0))
        out = model.transformer.decoder(
            model.pos_encoder(model.tgt_embedding(ys) * math.sqrt(model.d_model)),
            memory,
            tgt_mask = tgt_mask.to(device),
            memory_key_padding_mask=src_padding_mask
        )
        out = model.fc(out)
        prob = out[-1, 0]
        next_token = prob.argmax().item()

        ys = torch.cat([ys, torch.ones(1,1).type_as(src.data).fill_(next_token)], dim=0)
        # tokenizer_tgt.sep_token_id:bert, tokenizer_tgt.eos_token_id:gpt => 둘 다 종료 문자
        if next_token == tokenizer_tgt.sep_token_id or next_token == tokenizer_tgt.eos_token_id:
            break

    tgt_tokens = ys.squeeze().tolist()
    #skip_special_tokens = True => [CLS], [SEP], [PAD], [EOS] 와 같은 SPECIAL TOKEN을 출력하지 않음
    translation = tokenizer_tgt.decode(tgt_tokens, skip_special_tokens=True)
    return translation

test_sentence = "I want to go home"
translated = translate_sentence(model_loaded, test_sentence, tokenizer_src, tokenizer_tgt)
print(" 번역 결과: ",translated)

'''
번역 결과:  Wenn ich möchte, wohin ich möchte Ihnen alles zu meiner Geschichte.
'''

transformer 예제 4

import pandas as pd

from sklearn.model_selection import train_test_split

from accelerate import Accelerator #pip install accelerate

import torch
from torch.utils.data import DataLoader

from tqdm import tqdm

from transformers import AutoTokenizer
from transformers import AutoModelForSequenceClassification
from transformers import get_scheduler


def get_device():
  device="cpu"
  if torch.cuda.is_available():
    device="cuda"
  else:
    device="cpu"
  return device


device = get_device()
print(device)

real=pd.read_csv('data/True.csv')
fake=pd.read_csv('data/Fake.csv')

real = real.drop(['title','subject','date'], axis=1)
real['label']=1.0
fake = fake.drop(['title','subject','date'], axis=1)
fake['label']=0.0
dataframe=pd.concat([real,fake],axis=0,ignore_index=True)

df = dataframe.sample(frac=0.1).reset_index(drop=True)
print(df.head(20))
print(len(df[df['label']==1.0]))
print(len(df[df['label']==0.0]))

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

# (text, label) 형태의 튜플로 구성된 리스트 생성

data = list(zip(df['text'].tolist(), df['label'].tolist()))
# 다음 함수는 파라미터로 texts와 lables로 구성된 리스트를 가지며
# 출력으로 input_ids, attention_mask, labels_out을 생성
def tokenize_and_encode(texts, labels):
    input_ids, attention_masks, labels_out = [], [], []
    for text, label in zip(texts, labels):
        encoded = tokenizer.encode_plus(text, max_length=512, padding='max_length', truncation=True)
        input_ids.append(encoded['input_ids'])
        attention_masks.append(encoded['attention_mask'])
        labels_out.append(label)
    return torch.tensor(input_ids), torch.tensor(attention_masks), torch.tensor(labels_out)

# 튜플을 분리하여 containing texts, containing labels 리스트 생성
texts, labels = zip(*data)

# 학습 및 검증 데이터셋 분리
train_texts, val_texts, train_labels, val_labels = train_test_split(texts,labels,test_size=0.2)

# 토큰화
train_input_ids, train_attention_masks, train_labels = tokenize_and_encode(train_texts, train_labels)
val_input_ids, val_attention_masks, val_labels = tokenize_and_encode(val_texts, val_labels)

print('train_input_ids ',train_input_ids[0].shape ,train_input_ids[0], '\n'
      'train_attention_masks ', train_attention_masks[0] ,train_attention_masks[0], '\n'
      'train_labels', train_labels[0])


class TextClassificationDataset(torch.utils.data.Dataset):
  def __init__(self, input_ids, attention_masks, labels, num_classes=2):
      self.input_ids = input_ids
      self.attention_masks = attention_masks
      self.labels = labels
      self.num_classes = num_classes
      self.one_hot_labels = self.one_hot_encode(labels, num_classes)
  def __len__(self):
    return len(self.input_ids)

  def __getitem__(self, idx):
      return {
          'input_ids': self.input_ids[idx],
          'attention_mask': self.attention_masks[idx],
          'labels': self.one_hot_labels[idx]
      }
  @staticmethod
  def one_hot_encode(targets, num_classes):
    targets = targets.long()
    one_hot_targets = torch.zeros(targets.size(0), num_classes)
    one_hot_targets.scatter_(1, targets.unsqueeze(1), 1.0)
    return one_hot_targets

train_dataset = TextClassificationDataset(train_input_ids, train_attention_masks, train_labels)
val_dataset = TextClassificationDataset(val_input_ids, val_attention_masks, val_labels)

train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
eval_dataloader = DataLoader(val_dataset, batch_size=8)

print(len(train_dataset))
len((val_dataset))

item=next(iter(train_dataloader))
item_ids,item_mask,item_labels=item['input_ids'],item['attention_mask'],item['labels']
print ('item_ids, ',item_ids.shape, '\n',
       'item_mask, ',item_mask.shape, '\n',
       'item_labels, ',item_labels.shape, '\n',)

model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)


from torch.optim import AdamW
optimizer = AdamW(model.parameters(), lr=5e-5)

# 모델 및 옵티마이저 준비
accelerator = Accelerator()
model, optimizer, train_dataloader, eval_dataloader = accelerator.prepare(
    model, optimizer, train_dataloader, eval_dataloader
)

# 메트릭 함수 가져오기
from sklearn.metrics import accuracy_score

num_epochs = 1
num_training_steps = num_epochs * len(train_dataloader)
lr_scheduler = get_scheduler(
    "linear",
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=num_training_steps
)
progress_bar = tqdm(range(num_training_steps))

for epoch in range(num_epochs):
    for batch in train_dataloader:
        outputs = model(**batch)
        loss = outputs.loss
        accelerator.backward(loss)
        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()
        progress_bar.update(1)
    model.eval()
    preds=[]
    out_label_ids = []
    epochs=1
    epoch=1

    for batch in eval_dataloader:
        with torch.no_grad():
            inputs = {k:v.to(device) for k, v, in batch.items()}
            outputs = model(**inputs)
            logits = outputs.logits

        preds.extend(torch.argmax(logits.detach().cpu(),dim=1).numpy())
        out_label_ids.extend(torch.argmax(inputs["labels"].detach().cpu(),dim=1).numpy())
    accuracy = accuracy_score(out_label_ids,preds)

    print(f"Epoch {epoch + 1}/{num_epochs} Evaluation Results:")
    print(f"Accuracy: {accuracy}")


from transformers import BertTokenizer
import torch
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

def inference(text, model,  label, device=device):
    # 토크나이저 불러오기 및 입력 텍스트 토큰화
    inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
    # 입력 텐서를 특정 디바이스로 전송(디폴트 값: 'cpu')
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # 모델을 eval 모드로 설정 후 추론
    model.eval()
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits

    # predicted label 인덱스 추출
    pred_label_idx = torch.argmax(logits.detach().cpu(), dim=1).item()
    print(f"predicted label index: {pred_label_idx}, actual label {label}")
    return pred_label_idx

text="""
WASHINGTON (ABC) A confirmed tornado was located near Bridgeville in Sussex County, Delaware, shortly after 6 p.m. ET Saturday, moving east at 50 mph, according to the National Weather Service. Downed trees and wires were reported in the area.
"""
inference(text, model, 1.0)
text="this is definately junk text I am typing"
inference(text, model, 0.0)

transformer 예제 5

import torch
from torch.utils.data import DataLoader, Dataset
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "gpt2"
model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

#
# # 패딩 토큰 설정
tokenizer.pad_token = tokenizer.eos_token

#
# # 데이터셋 불러오기
dataset = load_dataset("tiny_shakespeare", trust_remote_code=True)
'''
DatasetDict({
    train: Dataset({
        features: ['text'],
        num_rows: 1
    })
    validation: Dataset({
        features: ['text'],
        num_rows: 1
    })
    test: Dataset({
        features: ['text'],
        num_rows: 1
    })
})
'''

# 연속형 텍스트를 작은 청크로 분리
def split_text(text, max_length=100):
    return [text[i:i+max_length] for i in range(0, len(text), max_length)]

# split_text 함수를 데이터셋에 적용

split_texts = split_text(dataset["train"]["text"][0])
print(split_texts)

# split_texts 변수에 담긴 값을 토큰화
tokenized_texts = tokenizer(split_texts, return_tensors="pt", padding=True, truncation=True)

class ShiftedDataset(Dataset):
    def __init__(self, encodings):
        self.encodings = encodings

    def __getitem__(self, idx):
        input_ids = self.encodings["input_ids"][idx]
        attention_mask = self.encodings["attention_mask"][idx]
        labels = input_ids[1:].tolist() + [tokenizer.eos_token_id]
        return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": torch.tensor(labels)}


    def __len__(self):
        return len(self.encodings["input_ids"])


# # DataLoader 생성
train_dataset = ShiftedDataset(tokenized_texts)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=4)
item=next(iter(train_dataloader))
print(item['input_ids'])
print(item['attention_mask'])
print(item['labels'])


from accelerate import Accelerator
from transformers import GPT2LMHeadModel

# Accelerator 초기화
accelerator = Accelerator()

# training arguments 설정
num_epochs = 50
learning_rate = 5e-5

# GPT-2 모델 및 옵티마이저 초기화
model = GPT2LMHeadModel.from_pretrained("gpt2")
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Accelerator와 함께 학습시키기 위해 모델과 옵티마이저 준비
model, optimizer, train_dataloader = accelerator.prepare(model,
                                                         optimizer,
                                                         train_dataloader)

epoch=10

from torch.optim import AdamW
from tqdm import tqdm

# 파인튜닝 반복 루프
for epoch in range(num_epochs):
    epoch_iterator = tqdm(train_dataloader, desc=f"Epoch {epoch + 1}")
    for step, batch in enumerate(epoch_iterator):
        optimizer.zero_grad()
        input_ids = batch["attention_mask"]
        attention_mask = batch["attention_mask"]
        labels = batch["labels"]

        outputs = model(input_ids=input_ids,
                        attention_mask=attention_mask, labels=labels)
        loss = outputs.loss

        accelerator.backward(loss)
        optimizer.step()

        if step % 500 == 0:
            epoch_iterator.set_postfix({"Loss": loss.item()}, refresh=True)

    # 매 5회 에포크마다 모델 저장
    # 여러분 구글 드라이브의 알맞은 경로로 model_save_path 지정
    if (epoch + 1) % 5 == 0:
        model_save_path = f"model_save/text_generator_model/model_checkpoint_epoch_{epoch + 1}"
        model.save_pretrained(model_save_path)
        print(f"Model saved at epoch {epoch + 1}")

accelerator.wait_for_everyone()
unwrapped_model = accelerator.unwrap_model(model)


model_path = 'model_save/text_generator_model/model_checkpoint_epoch_10'
tokenizer_path = 'gpt2'

unwrapped_model.save_pretrained(model_path)
tokenizer.save_pretrained(tokenizer_path)


from transformers import GPT2Tokenizer
from transformers import GPT2LMHeadModel

def generate_poem(prompt, model_path, tokenizer_path, max_words=50, max_seq_len=100, temperature=1.0):
    # 파인튜닝 모델 및 토크나이저 불러오기
    model = GPT2LMHeadModel.from_pretrained(model_path)
    tokenizer = GPT2Tokenizer.from_pretrained(tokenizer_path)

    # padding token 및 padding side 설정
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = 'left'

    poem = ""
    remaining_words = max_words

    while remaining_words > 0:
        # 프롬프트 설정 및 text 생성
        input_ids = tokenizer.encode(prompt, return_tensors="pt", padding=True, truncation=True,
                                     max_length=max_seq_len)
        attention_mask = torch.ones_like(input_ids)

        max_tokens = min(remaining_words * 5, max_seq_len) # 각 단어가 평균 5개 토큰으로 구성했다고 가정
        output_ids = model.generate(
            input_ids,
            max_new_tokens = max_tokens,
            num_return_sequences=1, # 출력으로 리턴 되는 단어 수
            no_repeat_ngram_size=2, # size로 구성된 문장이 반복되지 않도록
            attention_mask=attention_mask,
            pad_token_id=tokenizer.pad_token_id,
            temperature=temperature, # 높을수록 창작, 낮을수록 정확하게(학습데이터와 가깝게)
        )

        # toekn IDs를 text로 변환
        generated_text = tokenizer.decode(output_ids[0], skip_special_tokens=True)
        poem += generated_text
        remaining_words -= len(generated_text.split())

        # 생성된 text
        prompt = generated_text.split()[-max_seq_len]
    return poem

import re

def post_process_poem(poem):
    # 여분의 스페이스(공백) 제거
    poem = re.sub(r'\s+',' ',poem).strip()

    # 각 sentence의 첫 글자를 대문자로 변경
    sentences = re.split(r'(?<=[\.\?!])\s', poem)
    formatted_sentences = [sentence.capitalize() for sentence in sentences]
    formatted_poem = ' '.join(formatted_sentences)

    # 가독성을 위해 줄 변경(line breaks) 조치
    line_breaks = re.compile(r'(?<=[,;:?!])\s')
    formatted_poem = line_breaks.sub('\n', formatted_poem)

    return formatted_poem

tokenizer_path = 'gpt2'
prompt = "love"
model_path = 'model_save/text_generator_model/model_checkpoint_epoch_10'
max_words = 50
temperature = 0.9  # 이 수치는 randomness 확장 혹은 축소를 위해 조정 가능
generated_poem = generate_poem(prompt, model_path, tokenizer_path, max_words=max_words, temperature=temperature)
formatted_poem = post_process_poem(generated_poem)
print(formatted_poem)


profile
+AI to AI+

0개의 댓글