Transformer 핵심 요소 (Pytorch)

임정민·2025년 3월 19일

메모장

목록 보기
30/33
post-thumbnail
import torch
import torch.nn as nn
import math
import numpy as np

class PositionalEncoding(nn.Module):
    """
    시퀀스 내 토큰 위치 정보를 인코딩하여 모델에 제공합니다.
    """
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        
        # 포지셔널 인코딩 행렬 생성
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        
        # 짝수 인덱스(2i)에는 사인 함수 적용
        pe[:, 0::2] = torch.sin(position * div_term)
        # 홀수 인덱스(2i+1)에는 코사인 함수 적용
        pe[:, 1::2] = torch.cos(position * div_term)
        
        # 배치 차원 추가 및 전치 [max_len, d_model] -> [1, max_len, d_model]
        pe = pe.unsqueeze(0).transpose(0, 1)
        
        # 모델 파라미터가 아닌 버퍼로 등록 (학습되지 않음)
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        """
        입력 텐서에 위치 정보를 더합니다.
        
        Args:
            x: 입력 임베딩 텐서 [seq_len, batch_size, d_model]
            
        Returns:
            위치 정보가 추가된 텐서 [seq_len, batch_size, d_model]
        """
        return x + self.pe[:x.size(0), :]
def scaled_dot_product_attention(query, key, value, mask=None):
    """
    스케일드 닷-프로덕트 어텐션을 계산합니다.
    
    Args:
        query: 쿼리 텐서 [batch_size, num_heads, seq_len_q, d_k]
        key: 키 텐서 [batch_size, num_heads, seq_len_k, d_k]
        value: 값 텐서 [batch_size, num_heads, seq_len_v, d_k] (보통 seq_len_k = seq_len_v)
        mask: 선택적 마스크 텐서 [batch_size, 1, seq_len_q, seq_len_k]
        
    Returns:
        출력 텐서와 어텐션 가중치
    """
    # d_k는 키 텐서의 마지막 차원
    d_k = key.size(-1)
    
    # Q와 K의 행렬 곱으로 어텐션 스코어 계산 (스케일링 전)
    # matmul: [batch_size, num_heads, seq_len_q, d_k] x [batch_size, num_heads, d_k, seq_len_k]
    # 결과: [batch_size, num_heads, seq_len_q, seq_len_k]
    scores = torch.matmul(query, key.transpose(-2, -1))
    
    # 스케일링: √d_k로 나누기
    scores = scores / math.sqrt(d_k)
    
    # 마스킹 적용 (패딩 토큰이나 미래 토큰을 가리기 위해)
    if mask is not None:
        # 마스크의 0인 위치에 매우 작은 값(-1e9)을 할당하여 softmax 후 해당 위치의 가중치가 0에 가까워지도록 함
        scores = scores.masked_fill(mask == 0, -1e9)
    
    # softmax 함수로 어텐션 가중치 계산
    # 각 쿼리에 대해 모든 키의 가중치 합이 1이 됨
    attention_weights = torch.nn.functional.softmax(scores, dim=-1)
    
    # 가중치와 값의 행렬 곱으로 출력 계산
    # [batch_size, num_heads, seq_len_q, seq_len_k] x [batch_size, num_heads, seq_len_v, d_k]
    # 결과: [batch_size, num_heads, seq_len_q, d_k]
    output = torch.matmul(attention_weights, value)
    
    return output, attention_weights
class MultiHeadAttention(nn.Module):
    """
    여러 어텐션 헤드를 병렬로 계산하는 멀티 헤드 어텐션 모듈입니다.
    """
    def __init__(self, d_model, num_heads, dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        
        # d_model이 num_heads로 나누어떨어지는지 확인
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model      # 모델 차원 (일반적으로 512)
        self.num_heads = num_heads  # 헤드 수 (일반적으로 8)
        self.d_k = d_model // num_heads  # 각 헤드의 차원
        
        # Q, K, V를 위한 선형 변환층
        # 각 행렬은 [d_model, d_model] 크기를 가짐
        self.wq = nn.Linear(d_model, d_model)
        self.wk = nn.Linear(d_model, d_model)
        self.wv = nn.Linear(d_model, d_model)
        
        # 출력 선형 변환층
        self.wo = nn.Linear(d_model, d_model)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, query, key, value, mask=None):
        """
        멀티 헤드 어텐션 계산을 수행합니다.
        
        Args:
            query: 쿼리 텐서 [batch_size, seq_len_q, d_model]
            key: 키 텐서 [batch_size, seq_len_k, d_model]
            value: 값 텐서 [batch_size, seq_len_v, d_model]
            mask: 선택적 마스크 텐서
            
        Returns:
            멀티 헤드 어텐션 출력 [batch_size, seq_len_q, d_model]
        """
        batch_size = query.size(0)
        
        # 1. 선형 변환
        q = self.wq(query)  # [batch_size, seq_len_q, d_model]
        k = self.wk(key)    # [batch_size, seq_len_k, d_model]
        v = self.wv(value)  # [batch_size, seq_len_v, d_model]
        
        # 2. 헤드 분할
        # [batch_size, seq_len, d_model] -> [batch_size, seq_len, num_heads, d_k] -> [batch_size, num_heads, seq_len, d_k]
        q = q.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = k.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = v.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        
        # 3. 스케일드 닷-프로덕트 어텐션 적용
        attn_output, _ = scaled_dot_product_attention(q, k, v, mask)
        
        # 4. 헤드 연결
        # [batch_size, num_heads, seq_len_q, d_k] -> [batch_size, seq_len_q, num_heads, d_k] -> [batch_size, seq_len_q, d_model]
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        
        # 5. 최종 선형 변환
        output = self.wo(attn_output)
        output = self.dropout(output)
        
        return output
class PositionWiseFeedForward(nn.Module):
    """
    시퀀스의 각 위치에 독립적으로 적용되는 2층 피드 포워드 네트워크입니다.
    """
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionWiseFeedForward, self).__init__()
        
        # 첫 번째 선형 변환
        self.fc1 = nn.Linear(d_model, d_ff)
        # 두 번째 선형 변환
        self.fc2 = nn.Linear(d_ff, d_model)
        # 드롭아웃
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        """
        포지션 와이즈 피드 포워드 네트워크 계산을 수행합니다.
        
        Args:
            x: 입력 텐서 [batch_size, seq_len, d_model]
            
        Returns:
            변환된 텐서 [batch_size, seq_len, d_model]
        """
        # 첫 번째 선형 변환 후 ReLU 활성화 함수 적용
        output = torch.relu(self.fc1(x))
        # 드롭아웃 적용
        output = self.dropout(output)
        # 두 번째 선형 변환
        output = self.fc2(output)
        
        return output
class EncoderLayer(nn.Module):
    """
    트랜스포머 인코더 레이어: 셀프 어텐션과 피드 포워드 네트워크로 구성됩니다.
    """
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(EncoderLayer, self).__init__()
        
        # 멀티 헤드 어텐션
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
        # 피드 포워드 네트워크
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff, dropout)
        
        # 두 개의 레이어 정규화 레이어
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        
        # 드롭아웃
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        """
        인코더 레이어 순전파 계산을 수행합니다.
        
        Args:
            x: 입력 텐서 [batch_size, seq_len, d_model]
            mask: 패딩 마스크 텐서
            
        Returns:
            변환된 텐서 [batch_size, seq_len, d_model]
        """
        # 1. 셀프 어텐션
        attn_output = self.self_attn(x, x, x, mask)
        
        # 2. 첫 번째 잔차 연결과 레이어 정규화
        # x + Dropout(MultiHeadAttention(x))
        x = self.norm1(x + self.dropout1(attn_output))
        
        # 3. 피드 포워드 네트워크
        ff_output = self.feed_forward(x)
        
        # 4. 두 번째 잔차 연결과 레이어 정규화
        # x + Dropout(FFN(x))
        x = self.norm2(x + self.dropout2(ff_output))
        
        return x
class DecoderLayer(nn.Module):
    """
    트랜스포머 디코더 레이어: 마스크드 셀프 어텐션, 인코더-디코더 어텐션, 피드 포워드 네트워크로 구성됩니다.
    """
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(DecoderLayer, self).__init__()
        
        # 마스크드 셀프 어텐션
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
        # 인코더-디코더 어텐션
        self.cross_attn = MultiHeadAttention(d_model, num_heads, dropout)
        # 피드 포워드 네트워크
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff, dropout)
        
        # 세 개의 레이어 정규화 레이어
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        
        # 드롭아웃
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
        
    def forward(self, x, enc_output, look_ahead_mask, padding_mask):
        """
        디코더 레이어 순전파 계산을 수행합니다.
        
        Args:
            x: 디코더 입력 텐서 [batch_size, seq_len, d_model]
            enc_output: 인코더 출력 텐서 [batch_size, enc_seq_len, d_model]
            look_ahead_mask: 룩-어헤드 마스크 (미래 위치 마스킹)
            padding_mask: 패딩 마스크
            
        Returns:
            변환된 텐서 [batch_size, seq_len, d_model]
        """
        # 1. 마스크드 셀프 어텐션
        self_attn_output = self.self_attn(x, x, x, look_ahead_mask)
        
        # 2. 첫 번째 잔차 연결과 레이어 정규화
        x = self.norm1(x + self.dropout1(self_attn_output))
        
        # 3. 인코더-디코더 어텐션
        # Query: 디코더의 출력, Key/Value: 인코더의 출력
        cross_attn_output = self.cross_attn(x, enc_output, enc_output, padding_mask)
        
        # 4. 두 번째 잔차 연결과 레이어 정규화
        x = self.norm2(x + self.dropout2(cross_attn_output))
        
        # 5. 피드 포워드 네트워크
        ff_output = self.feed_forward(x)
        
        # 6. 세 번째 잔차 연결과 레이어 정규화
        x = self.norm3(x + self.dropout3(ff_output))
        
        return x
class Encoder(nn.Module):
    """
    트랜스포머 인코더: 여러 인코더 레이어를 쌓은 구조입니다.
    """
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, 
                 max_seq_len=5000, dropout=0.1):
        super(Encoder, self).__init__()
        
        self.d_model = d_model
        
        # 입력 임베딩 레이어
        self.embedding = nn.Embedding(vocab_size, d_model)
        # 포지셔널 인코딩
        self.pos_encoding = PositionalEncoding(d_model, max_seq_len)
        
        # 인코더 레이어 스택
        self.enc_layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        """
        인코더 순전파 계산을 수행합니다.
        
        Args:
            x: 입력 시퀀스 [batch_size, seq_len]
            mask: 패딩 마스크
            
        Returns:
            인코더 출력 [batch_size, seq_len, d_model]
        """
        seq_len = x.size(1)
        
        # 1. 임베딩 및 포지셔널 인코딩 적용
        x = self.embedding(x) * math.sqrt(self.d_model)  # 스케일링
        x = self.pos_encoding(x)
        x = self.dropout(x)
        
        # 2. 모든 인코더 레이어 순차적 적용
        for layer in self.enc_layers:
            x = layer(x, mask)
            
        return x  # [batch_size, seq_len, d_model]
class Decoder(nn.Module):
    """
    트랜스포머 디코더: 여러 디코더 레이어를 쌓은 구조입니다.
    """
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, 
                 max_seq_len=5000, dropout=0.1):
        super(Decoder, self).__init__()
        
        self.d_model = d_model
        
        # 입력 임베딩 레이어
        self.embedding = nn.Embedding(vocab_size, d_model)
        # 포지셔널 인코딩
        self.pos_encoding = PositionalEncoding(d_model, max_seq_len)
        
        # 디코더 레이어 스택
        self.dec_layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, enc_output, look_ahead_mask, padding_mask):
        """
        디코더 순전파 계산을 수행합니다.
        
        Args:
            x: 디코더 입력 시퀀스 [batch_size, seq_len]
            enc_output: 인코더 출력 [batch_size, enc_seq_len, d_model]
            look_ahead_mask: 룩-어헤드 마스크
            padding_mask: 패딩 마스크
            
        Returns:
            디코더 출력 [batch_size, seq_len, d_model]
        """
        seq_len = x.size(1)
        
        # 1. 임베딩 및 포지셔널 인코딩 적용
        x = self.embedding(x) * math.sqrt(self.d_model)  # 스케일링
        x = self.pos_encoding(x)
        x = self.dropout(x)
        
        # 2. 모든 디코더 레이어 순차적 적용
        for layer in self.dec_layers:
            x = layer(x, enc_output, look_ahead_mask, padding_mask)
            
        return x  # [batch_size, seq_len, d_model]
class Transformer(nn.Module):
    """
    완전한 트랜스포머 모델: 인코더와 디코더를 결합합니다.
    """
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_layers=6, 
                 num_heads=8, d_ff=2048, max_seq_len=5000, dropout=0.1):
        super(Transformer, self).__init__()
        
        # 인코더
        self.encoder = Encoder(
            src_vocab_size, d_model, num_layers, num_heads, d_ff, 
            max_seq_len, dropout
        )
        
        # 디코더
        self.decoder = Decoder(
            tgt_vocab_size, d_model, num_layers, num_heads, d_ff, 
            max_seq_len, dropout
        )
        
        # 최종 선형 변환 (디코더 출력 -> 어휘 분포)
        self.final_layer = nn.Linear(d_model, tgt_vocab_size)
        
    def create_masks(self, src, tgt):
        """
        소스 및 타겟 시퀀스에 대한 마스크를 생성합니다.
        
        Args:
            src: 소스 시퀀스 [batch_size, src_seq_len]
            tgt: 타겟 시퀀스 [batch_size, tgt_seq_len]
            
        Returns:
            인코더 패딩 마스크, 디코더 룩-어헤드 마스크, 디코더 패딩 마스크
        """
        # 소스 시퀀스 패딩 마스크 (0이 아닌 위치는 True, 패딩(0)은 False)
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)  # [batch_size, 1, 1, src_seq_len]
        
        # 타겟 시퀀스 패딩 마스크
        tgt_padding_mask = (tgt != 0).unsqueeze(1).unsqueeze(2)  # [batch_size, 1, 1, tgt_seq_len]
        
        # 타겟 시퀀스 룩-어헤드 마스크 (미래 위치 마스킹)
        # 하삼각행렬 형태로, 대각선 위쪽은 0(마스킹됨), 대각선과 그 아래는 1(마스킹 안 됨)
        seq_len = tgt.size(1)
        look_ahead_mask = torch.triu(torch.ones(1, seq_len, seq_len), diagonal=1).eq(0)
        look_ahead_mask = look_ahead_mask.to(tgt.device)
        
        # 룩-어헤드 마스크와 패딩 마스크 결합
        combined_mask = look_ahead_mask & tgt_padding_mask
        
        return src_mask, combined_mask, src_mask
        
    def forward(self, src, tgt):
        """
        트랜스포머 모델의 순전파 계산을 수행합니다.
        
        Args:
            src: 소스 시퀀스 [batch_size, src_seq_len]
            tgt: 타겟 시퀀스 [batch_size, tgt_seq_len]
            
        Returns:
            출력 로짓 [batch_size, tgt_seq_len, tgt_vocab_size]
        """
        # 마스크 생성
        enc_padding_mask, look_ahead_mask, dec_padding_mask = self.create_masks(src, tgt)
        
        # 인코더 순전파
        enc_output = self.encoder(src, enc_padding_mask)  # [batch_size, src_seq_len, d_model]
        
        # 디코더 순전파
        dec_output = self.decoder(
            tgt, enc_output, look_ahead_mask, dec_padding_mask
        )  # [batch_size, tgt_seq_len, d_model]
        
        # 최종 선형 변환
        final_output = self.final_layer(dec_output)  # [batch_size, tgt_seq_len, tgt_vocab_size]
        
        return final_output
profile
https://github.com/min731

0개의 댓글