import torch
import torch.nn as nn
import math
import numpy as np
class PositionalEncoding(nn.Module):
"""
시퀀스 내 토큰 위치 정보를 인코딩하여 모델에 제공합니다.
"""
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
"""
입력 텐서에 위치 정보를 더합니다.
Args:
x: 입력 임베딩 텐서 [seq_len, batch_size, d_model]
Returns:
위치 정보가 추가된 텐서 [seq_len, batch_size, d_model]
"""
return x + self.pe[:x.size(0), :]
def scaled_dot_product_attention(query, key, value, mask=None):
"""
스케일드 닷-프로덕트 어텐션을 계산합니다.
Args:
query: 쿼리 텐서 [batch_size, num_heads, seq_len_q, d_k]
key: 키 텐서 [batch_size, num_heads, seq_len_k, d_k]
value: 값 텐서 [batch_size, num_heads, seq_len_v, d_k] (보통 seq_len_k = seq_len_v)
mask: 선택적 마스크 텐서 [batch_size, 1, seq_len_q, seq_len_k]
Returns:
출력 텐서와 어텐션 가중치
"""
d_k = key.size(-1)
scores = torch.matmul(query, key.transpose(-2, -1))
scores = scores / math.sqrt(d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
attention_weights = torch.nn.functional.softmax(scores, dim=-1)
output = torch.matmul(attention_weights, value)
return output, attention_weights
class MultiHeadAttention(nn.Module):
"""
여러 어텐션 헤드를 병렬로 계산하는 멀티 헤드 어텐션 모듈입니다.
"""
def __init__(self, d_model, num_heads, dropout=0.1):
super(MultiHeadAttention, self).__init__()
assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.wq = nn.Linear(d_model, d_model)
self.wk = nn.Linear(d_model, d_model)
self.wv = nn.Linear(d_model, d_model)
self.wo = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, query, key, value, mask=None):
"""
멀티 헤드 어텐션 계산을 수행합니다.
Args:
query: 쿼리 텐서 [batch_size, seq_len_q, d_model]
key: 키 텐서 [batch_size, seq_len_k, d_model]
value: 값 텐서 [batch_size, seq_len_v, d_model]
mask: 선택적 마스크 텐서
Returns:
멀티 헤드 어텐션 출력 [batch_size, seq_len_q, d_model]
"""
batch_size = query.size(0)
q = self.wq(query)
k = self.wk(key)
v = self.wv(value)
q = q.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
k = k.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
v = v.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
attn_output, _ = scaled_dot_product_attention(q, k, v, mask)
attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
output = self.wo(attn_output)
output = self.dropout(output)
return output
class PositionWiseFeedForward(nn.Module):
"""
시퀀스의 각 위치에 독립적으로 적용되는 2층 피드 포워드 네트워크입니다.
"""
def __init__(self, d_model, d_ff, dropout=0.1):
super(PositionWiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
"""
포지션 와이즈 피드 포워드 네트워크 계산을 수행합니다.
Args:
x: 입력 텐서 [batch_size, seq_len, d_model]
Returns:
변환된 텐서 [batch_size, seq_len, d_model]
"""
output = torch.relu(self.fc1(x))
output = self.dropout(output)
output = self.fc2(output)
return output
class EncoderLayer(nn.Module):
"""
트랜스포머 인코더 레이어: 셀프 어텐션과 피드 포워드 네트워크로 구성됩니다.
"""
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super(EncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
def forward(self, x, mask):
"""
인코더 레이어 순전파 계산을 수행합니다.
Args:
x: 입력 텐서 [batch_size, seq_len, d_model]
mask: 패딩 마스크 텐서
Returns:
변환된 텐서 [batch_size, seq_len, d_model]
"""
attn_output = self.self_attn(x, x, x, mask)
x = self.norm1(x + self.dropout1(attn_output))
ff_output = self.feed_forward(x)
x = self.norm2(x + self.dropout2(ff_output))
return x
class DecoderLayer(nn.Module):
"""
트랜스포머 디코더 레이어: 마스크드 셀프 어텐션, 인코더-디코더 어텐션, 피드 포워드 네트워크로 구성됩니다.
"""
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super(DecoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
self.cross_attn = MultiHeadAttention(d_model, num_heads, dropout)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.dropout3 = nn.Dropout(dropout)
def forward(self, x, enc_output, look_ahead_mask, padding_mask):
"""
디코더 레이어 순전파 계산을 수행합니다.
Args:
x: 디코더 입력 텐서 [batch_size, seq_len, d_model]
enc_output: 인코더 출력 텐서 [batch_size, enc_seq_len, d_model]
look_ahead_mask: 룩-어헤드 마스크 (미래 위치 마스킹)
padding_mask: 패딩 마스크
Returns:
변환된 텐서 [batch_size, seq_len, d_model]
"""
self_attn_output = self.self_attn(x, x, x, look_ahead_mask)
x = self.norm1(x + self.dropout1(self_attn_output))
cross_attn_output = self.cross_attn(x, enc_output, enc_output, padding_mask)
x = self.norm2(x + self.dropout2(cross_attn_output))
ff_output = self.feed_forward(x)
x = self.norm3(x + self.dropout3(ff_output))
return x
class Encoder(nn.Module):
"""
트랜스포머 인코더: 여러 인코더 레이어를 쌓은 구조입니다.
"""
def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff,
max_seq_len=5000, dropout=0.1):
super(Encoder, self).__init__()
self.d_model = d_model
self.embedding = nn.Embedding(vocab_size, d_model)
self.pos_encoding = PositionalEncoding(d_model, max_seq_len)
self.enc_layers = nn.ModuleList([
EncoderLayer(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
])
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
"""
인코더 순전파 계산을 수행합니다.
Args:
x: 입력 시퀀스 [batch_size, seq_len]
mask: 패딩 마스크
Returns:
인코더 출력 [batch_size, seq_len, d_model]
"""
seq_len = x.size(1)
x = self.embedding(x) * math.sqrt(self.d_model)
x = self.pos_encoding(x)
x = self.dropout(x)
for layer in self.enc_layers:
x = layer(x, mask)
return x
class Decoder(nn.Module):
"""
트랜스포머 디코더: 여러 디코더 레이어를 쌓은 구조입니다.
"""
def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff,
max_seq_len=5000, dropout=0.1):
super(Decoder, self).__init__()
self.d_model = d_model
self.embedding = nn.Embedding(vocab_size, d_model)
self.pos_encoding = PositionalEncoding(d_model, max_seq_len)
self.dec_layers = nn.ModuleList([
DecoderLayer(d_model, num_heads, d_ff, dropout)
for _ in range(num_layers)
])
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_output, look_ahead_mask, padding_mask):
"""
디코더 순전파 계산을 수행합니다.
Args:
x: 디코더 입력 시퀀스 [batch_size, seq_len]
enc_output: 인코더 출력 [batch_size, enc_seq_len, d_model]
look_ahead_mask: 룩-어헤드 마스크
padding_mask: 패딩 마스크
Returns:
디코더 출력 [batch_size, seq_len, d_model]
"""
seq_len = x.size(1)
x = self.embedding(x) * math.sqrt(self.d_model)
x = self.pos_encoding(x)
x = self.dropout(x)
for layer in self.dec_layers:
x = layer(x, enc_output, look_ahead_mask, padding_mask)
return x
class Transformer(nn.Module):
"""
완전한 트랜스포머 모델: 인코더와 디코더를 결합합니다.
"""
def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_layers=6,
num_heads=8, d_ff=2048, max_seq_len=5000, dropout=0.1):
super(Transformer, self).__init__()
self.encoder = Encoder(
src_vocab_size, d_model, num_layers, num_heads, d_ff,
max_seq_len, dropout
)
self.decoder = Decoder(
tgt_vocab_size, d_model, num_layers, num_heads, d_ff,
max_seq_len, dropout
)
self.final_layer = nn.Linear(d_model, tgt_vocab_size)
def create_masks(self, src, tgt):
"""
소스 및 타겟 시퀀스에 대한 마스크를 생성합니다.
Args:
src: 소스 시퀀스 [batch_size, src_seq_len]
tgt: 타겟 시퀀스 [batch_size, tgt_seq_len]
Returns:
인코더 패딩 마스크, 디코더 룩-어헤드 마스크, 디코더 패딩 마스크
"""
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
tgt_padding_mask = (tgt != 0).unsqueeze(1).unsqueeze(2)
seq_len = tgt.size(1)
look_ahead_mask = torch.triu(torch.ones(1, seq_len, seq_len), diagonal=1).eq(0)
look_ahead_mask = look_ahead_mask.to(tgt.device)
combined_mask = look_ahead_mask & tgt_padding_mask
return src_mask, combined_mask, src_mask
def forward(self, src, tgt):
"""
트랜스포머 모델의 순전파 계산을 수행합니다.
Args:
src: 소스 시퀀스 [batch_size, src_seq_len]
tgt: 타겟 시퀀스 [batch_size, tgt_seq_len]
Returns:
출력 로짓 [batch_size, tgt_seq_len, tgt_vocab_size]
"""
enc_padding_mask, look_ahead_mask, dec_padding_mask = self.create_masks(src, tgt)
enc_output = self.encoder(src, enc_padding_mask)
dec_output = self.decoder(
tgt, enc_output, look_ahead_mask, dec_padding_mask
)
final_output = self.final_layer(dec_output)
return final_output