Attention is All You Need(https://arxiv.org/pdf/1706.03762.pdf) 논문 내용을 리뷰하고 transformer 알고리즘을 분석하는 글입니다.
1.Multi-head attention에서 4차원 구조가 만들어지는 이유
2.super().__init__()의 역할
3.Encoder layer에서 _src, _ = self.self_attention(src, src, src, src_mask)의
코드는 MultiHeadAttentionLayer인데 forward method를 적지 않고도 바로 쓰이는 이유
4.잔차연결에서 dropout이 사용되는 이유
이 논문은 Recurrent Neural Networks, Convolutional Neural Networks를 사용하지 않고 Attention Mechanism만을 이용하는 Transformer라는 network 아키텍처를 제시한다.
이 구조는 기계 번역에서 '병렬처리'가 가능하다는 장점으로 앞선 networks들에 비해 상대적으로 적은 학습 시간으로도 더 좋은 결과를 만들어 냈다. BLEU score(https://wikidocs.net/31695)
이미지 출처 : https://ai.googleblog.com/2017/08/transformer-novel-neural-network.html
Transformer는 Encoder와 Decoder로 이루어져있다. 이 구조는 여러 단계를 거치는데 이 단계에서 사용되는 알고리즘들을 함수라고 생각하고 식으로 적어보면 좀더 쉽게 이해가 가능하다.
multi-head self-attention, position-wise FFNN의 두 가지는 각각의 Encoder layer를 이루는 2개의 sublayer이다. 이 sublayer들은 모두 resiudual connection을 거치게 된다. 또한 논문에서는 2개의 sublayer로 구성된 Encoder를 6개 사용한다.
x = input_vector
First_step = Embedding512(x) + PositionalEncoding512(x)
Second_step = layer_normalization(multihead_attention(First_step) + First_step)
Third_step = layer_normalization(FFN(Second_step) + Second_step)
enc_out = Third_step
Decoder는 3개의 sublayer로 구성이 되어있다. 첫번째와 두번째 sublayer는 multi-head self-attention이다. 다만 첫번째 attention은 학습시에 이미 번역이 된 단어들을 참고하는 것을 방지하기 위해서 masking을 사용한다. 또한 두번째 attention은 encoder의 마지막 attention 값을 K, V 벡터로 사용한다. 이 벡터에 대한 설명은 뒤에 하도록 한다. decoder의 마지막 sublayer는 FFNN으로 encoder와 동일하다. 논문에서는 마찬가지로 3개의 sublayer로 구성된 Decoder를 6개를 사용한다.
x = input_vector
First_step = Embedding512(x) + PositionalEncoding512(x)
Second_step = layer_normalization(masked_multihead_attention(First_step) + First_step)
Third_step = layer_normalization(multihead_attention(Second_step + enc_out) + Second_step)
Fourth_step = layer_normalization(FFN(Third_step) + Third_step)
dec_out = Fourth_step
이미지 출처 : https://ai.googleblog.com/2017/08/transformer-novel-neural-network.html
Attention은 Encoder와 Decoder에서 조금 다르게 쓰일뿐 목적은 같다. Query는 '던져주는 값'이며 Key는 Query의 값을 기준으로 찾는 대상이다. Value는 계산된 어텐션 값이 적용이 되는 Key의 실제 값이다.
중요한 것은 임베딩한 단어를 바로 사용해서 어텐션 값을 구하는 것이 아니고 Query, Key, Value라는 값을 사용해서 어텐션의 값을 구한다는 것이다. Query, Key, Value의 차원은 d_model을 num_head로 나누어준 값이 된다.
인코더의 첫번째 서브층(Encoder Self-Attention) : Query = Key = Value
디코더의 첫번째 서브층(Masked Decoder Self-Attention) : Query = Key = Value
디코더의 두번째 서브층(Encoder-Decoder Attention) : Query : 디코더 행렬 / Key = Value : 인코더 행렬
논문에서는 하나의 attention을 수행하는 것보다 작은 크기의 attention을 여러번 나누어 계산한 뒤 합쳐(concatenate)주는 것이 더 좋다고 한다.
Scaled Dot-Product Attention
Q와 K벡터를 이용해서 attention score -> softmax -> attention distribution을 구하고 여기에 V 벡터를 계산해서 attention value값을 구한다.
이미지 출처 : https://wikidocs.net/31379
이미지 출처 : https://www.youtube.com/watch?v=AA621UofTUA
이미지 출처 : https://wikidocs.net/31379
attention 값은 head 별로 1개씩 생성이 된다. embedding한 단어의 차원이 512이고, head의 개수가 8개라면 512 / 8 = 64의 차원을 가진 8가지 attention 값들이 만들어지는 것이다.
이미지 출처 : https://wikidocs.net/31379
이들 head를 이어서 붙여주면(concatenate) 완전한 attention 값들의 행렬이 된다.
이미지 출처 : https://pozalabs.github.io/transformer/
트랜스포머 또한 seq2seq와 마찬가지로 교사 강요(Teacher Forcing)을 사용하여 훈련되므로 학습 과정에서 디코더는 번역할 문장에 해당되는 sos je suis étudiant의 문장 행렬을 한 번에 입력받는다. 그리고 디코더는 이 문장 행렬로부터 각 시점의 단어를 예측하도록 훈련된다.
decoder의 첫번째 sublayer로서 masking out을 활용해서 attention 값을 만드는 학습시에 번역해야할 단어들을 참고하는 것을 방지하는 mask를 더해준다. 그 외에는 scaled dot-product attention과 동일하다.
decoder의 두번째 sublayer로서 query 값은 이전 decoder layer의 값을 사용하고, key value 값은 encoder의 output 값을 사용한다. query라는 것은 질문이라고 생각하면 되는 것 같다. decoder에서 참고가 필요한 값들을 query로 질문한다면 참고값들을 key에서 가져와 비교(softmax)해서 출력하는 것이다.
기본적으로 input layer - hidden layer - ouput layer로 구성이 된 fully connected feed-forward network가 사용이 된다.
linear trasformation 사이에 activation function으로는 ReLU 함수가 사용되고 있으며, network의 input이 attention을 거친 단어의 vector이므로 position별로 output을 계산한다는 특징을 가진다.
이미지 출처 : https://wikidocs.net/31379
RNN과 달리 transformer는 순차적인 입력을 바탕으로 처리를 하지 않는다. 따라서 단어의 위치정보를 다른 방식으로 처리를 해주어야 하는데, 이때 사용되는 방식이 posionional encoding이다.
이미지 출처 : https://wikidocs.net/31379
positional encoding을 위해서 sin함수와 cos함수를 사용한다. 여기에서 pos는 입력 문장에서의 임베딩 벡터의 위치를 타나내며, i는 임베딩 벡터 내의 차원의 인덱스를 의미한다. 또한 dmodel은 임베딩 벡터의 차원을 의미한다. positional encoding을 사용하면 같은 단어라고 하더라도 문장 내의 위치에 따라서 벡터의 값이 달라진다.
이미지 출처 : https://wikidocs.net/31379
x + sublayer(x)
이미지 출처 : https://wikidocs.net/31379
normalization 정규화는 gradient descent 과정에서 계산 결과가 발산하는 것을 줄여준다.
Layer Normalization는 텐서의 마지막 차원에 대해서 평균과 분산을 구하고, 이를 가지고 어떤 수식을 통해 값을 정규화하여 학습을 돕는 역할을 한다.
normalize에 대해서 아래 링크를 참고
https://sonsnotation.blogspot.com/2020/11/8-normalization.html
디코더의 끝단에는 다중 클래스 분류 문제를 풀 수 있도록, vocab_size 만큼의 뉴런을 가지는 출력층을 추가해준다.
d_model : 인코더와 디코더에서 정해진 입력과 출력의 크기. 이 크기가 유지 되어야 여러개의 인코더와 디코더를 쌓아서 사용이 가능하다. 논문에서는 512를 사용했다.
num_layers : 인코더와 디코더를 쌓으 개수 논문에서는 각각 6개의 인코더와 디코더를 사용했다.
num_heads : 멀티헤드 어텐션으로 병렬 어텐션을 사용하는 개수. 논문에서는 8개의 헤드를 사용했다.
dff : feed-forward-network의 은닉층의 크기 논문에서는 2048개의 뉴런을 사용했다.
https://wikidocs.net/31379의 코드를 참고해서 분석해보았다.
super().__init__()
관련 문제로 이 코드는 실행이 안된다. 아직 해결하지 못했다.
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
class PositionalEncoding(tf.keras.layers.Layer):
def __init__(self, position, d_model):
super(PositionalEncoding, self).__init__
self.pos_encoding = self.positional_encoding(position, d_model)
def get_angles(self, position, i, d_model):
# angles = (1,128) positon(50,1)
angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
return position * angles
def positional_encoding(self, position, d_model):
angle_rads = self.get_angles(
# angles = (1,128) positon(50,1)
position = tf.range(position, dtype=tf.float32)[:, tf.newaxis],
i = tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
d_model = d_model
)
# 구한 angle 값들에 sin, cos을 취해준다.
sines = tf.math.sin(angle_rads[:, 0::2])
cosines = tf.math.cos(angle_rads[:, 1::2])
# 빈공간을 만들어주고 구한 sin,cos 값들을 넣어준다.
angle_rads = np.zeros(angle_rads.shape)
angle_rads[:, 0::2] = sines
angle_rads[:, 1::2] = cosines
pos_encoding = tf.constant(angle_rads)
pos_encoding = pos_encoding[tf.newaxis, ...]
print(pos_encoding.shape)
return tf.cast(pos_encoding, tf.float32)
def call(self, inputs):
return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
과정을 보면 알겠지만 Q와 K가 곱해질때 Attention 값의 상대적인 비중은 정해진다.
즉 비슷한 위치에 값들이 모여있다면 두 벡터의 가중합의 크기는 더 커질 것이다.
def scaled_dot_product_attention(query, key, value, mask):
# query 크기 : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
# key 크기 : (batch_size, num_heads, key의 문장 길이, d_model/num_heads)
# value 크기 : (batch_size, num_heads, value의 문장 길이, d_model/num_heads)
# padding_mask : (batch_size, 1, 1, key의 문장 길이)
# Q와 K의 곱. 어텐션 스코어 행렬.
matmul_qk = tf.matmul(query, key, transpose_b=True)
# 스케일링 // dk의 루트값으로 나눠준다.
depth = tf.cast(tf.shape(key[-1], tf.float32))
logits = matmul_qk / tf.math.sqrt(depth)
# 마스킹. 어텐션 스코어 행렬의 마스킹 할 위치에 매우 작은 음수값을 넣는다.
# 매우 작은 값이므로 소프트맥스 함수를 지나면 행렬의 해당 위치의 값은 0이 된다.
if mask is not None:
logits += (mask * -1e9)
# 소프트맥스 함수는 마지막 차원인 key의 문장 길이 방향으로 수행된다.(2차원의 경우 가로방향 axis=1)
# attention weight : (batch_size, num_heads, query의 문장 길이, key의 문장 길이)
# attention distribution이 쌓여진 값
attention_weights = tf.nn.softmax(logits, axis=-1)
# output : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
# attention value
output = tf.matmul(attention_weights, value)
return output, attention_weights
class MultiHeadAttention(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, name="multi_head_attention"):
super(MultiHeadAttention, self).__init__(name=name)
self.num_heads = num_heads
self.d_model = d_model
# 병렬을 하는 것이기에 나누어 떨어져야한다.
assert d_model % self.num_heads == 0
# d_model을 num_heads로 나눈 값.
# 논문 기준 : 64
self.depth = d_model // self.num_heads
# WQ, WK, WV에 해당하는 밀집층 정의
# 512의 아웃풋을 만드는 은닉층처럼 만든다
self.query_dense = tf.keras.layers.Dense(units=d_model)
self.key_dense = tf.keras.layers.Dense(units=d_model)
self.value_dense = tf.keras.layers.Dense(units=d_model)
# WO에 해당하는 밀집층 정의
self.dense = tf.keras.layers.Dense(units=d_model)
# num_heads 개수만큼 q, k, v를 split하는 함수
def split_heads(self, inputs, batch_size):
inputs = tf.reshape(
# 왜 4차원으로 바꿔주는 걸까?
# 앞서 보았던 그림처럼 num_heads는 3차원으로 생각하고
# 기본적으로는 batch_size, depth를 가지는 2차원이 q, k, v의 형태로 잡고갔는데...
inputs, shape=(batch_size, -1, self.num_heads, self.depth))
return tf.transpose(inputs, perm=[0, 2, 1, 3])
def call(self, inputs):
query, key, value, mask = inputs['query'], inputs['key'], inputs[
'value'], inputs['mask']
batch_size = tf.shape(query)[0]
# 1. WQ, WK, WV에 해당하는 밀집층 지나기
# q : (batch_size, query의 문장 길이, d_model)
# k : (batch_size, key의 문장 길이, d_model)
# v : (batch_size, value의 문장 길이, d_model)
# 참고) 인코더(k, v)-디코더(q) 어텐션에서는 query 길이와 key, value의 길이는 다를 수 있다.
query = self.query_dense(query)
key = self.key_dense(key)
value = self.value_dense(value)
# 2. 헤드 나누기
# q : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
# k : (batch_size, num_heads, key의 문장 길이, d_model/num_heads)
# v : (batch_size, num_heads, value의 문장 길이, d_model/num_heads)
query = self.split_heads(query, batch_size)
key = self.split_heads(key, batch_size)
value = self.split_heads(value, batch_size)
# 3. 스케일드 닷 프로덕트 어텐션. 앞서 구현한 함수 사용.
# (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask)
# (batch_size, query의 문장 길이, num_heads, d_model/num_heads)
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
# 4. 헤드 연결(concatenate)하기
# (batch_size, query의 문장 길이, d_model)
concat_attention = tf.reshape(scaled_attention,
(batch_size, -1, self.d_model))
# 5. WO에 해당하는 밀집층 지나기
# (batch_size, query의 문장 길이, d_model)
outputs = self.dense(concat_attention)
return outputs
def create_padding_mask(x):
mask = tf.cast(tf.math.equal(x, 0), tf.float32)
# (batch_size, 1, 1, key의 문장 길이)
return mask[:, tf.newaxis, tf.newaxis, :]
def encoder_layer(dff, d_model, num_heads, dropout, name='encoder_layer'):
inputs = tf.keras.Input(shape=(None, d_model), name='inputs')
# 인코더는 패딩 마스크 사용
padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
# 멀티-헤드 어텐션 (첫번째 서브층 / 셀프 어텐션)
attention = MultiHeadAttention(
d_model, num_heads, name="attention")({
'query': inputs, 'key': inputs, 'value': inputs, # Q = K = V
'mask': padding_mask # 패딩 마스크 사용
})
# 드롭아웃 + 잔차 연결과 층 정규화
attention = tf.keras.layers.Dropout(rate=dropout)(attention)
attention = tf.keras.layers.LayerNormalization(epsilon=1e-6)(inputs + attention)
# 포지션 와이즈 피드 포워드 신경망 (두번째 서브층)
outputs = tf.keras.layers.Dense(units=dff, activation='relu')(attention)
outputs = tf.keras.layers.Dense(units=d_model)(outputs)
# 드롭아웃 + 잔차 연결과 층 정규화
outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention + outputs)
return tf.keras.Model(inputs=[inputs, padding_mask], outputs=outputs, name=name)
def encoder(vocab_size, num_layers, dff, d_model, num_heads, dropout, name='encoder'):
inputs = tf.keras.Input(shape=(None,), name="inputs")
# 인코더는 패딩 마스크 사용
padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
# 포지셔널 인코딩 + 드롭아웃
embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
# 인코더를 num_layers개 쌓기
for i in range(num_layers):
outputs = encoder_layer(dff=dff, d_model=d_model, num_heads=num_heads,
dropout=dropout, name="encoder_layer_{}".format(i),
)([outputs, padding_mask])
return tf.keras.Model(inputs=[inputs, padding_mask], outputs=outputs, name=name)
# 디코더의 첫번째 서브층(sublayer)에서 미래 토큰을 Mask하는 함수
def create_look_ahead_mask(x):
seq_len = tf.shape(x)[1]
look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
padding_mask = create_padding_mask(x) # 패딩 마스크도 포함
return tf.maximum(look_ahead_mask, padding_mask)
https://github.com/ndb796/Deep-Learning-Paper-Review-and-Practice 동빈나님의 코드를 분석해보았다.
import torch.nn as nn
class MultiHeadAttentionLayer(nn.Module):
def __init__(self, hidden_dim, n_heads, dropout_ratio, device):
super().__init__()
assert hidden_dim % n_heads == 0
self.hidden_dim = hidden_dim # 임베딩 차원
self.n_heads = n_heads # 헤드(head)의 개수: 서로 다른 어텐션(attention) 컨셉의 수
self.head_dim = hidden_dim // n_heads # 각 헤드(head)에서의 임베딩 차원
self.fc_q = nn.Linear(hidden_dim, hidden_dim) # Query 값에 적용될 FC 레이어
self.fc_k = nn.Linear(hidden_dim, hidden_dim) # Key 값에 적용될 FC 레이어
self.fc_v = nn.Linear(hidden_dim, hidden_dim) # Value 값에 적용될 FC 레이어
self.fc_o = nn.Linear(hidden_dim, hidden_dim)
self.dropout = nn.Dropout(dropout_ratio)
self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)
def forward(self, query, key, value, mask = None):
batch_size = query.shape[0]
# query: [batch_size, query_len, hidden_dim]
# key: [batch_size, key_len, hidden_dim]
# value: [batch_size, value_len, hidden_dim]
Q = self.fc_q(query)
K = self.fc_k(key)
V = self.fc_v(value)
# Q: [batch_size, query_len, hidden_dim]
# K: [batch_size, key_len, hidden_dim]
# V: [batch_size, value_len, hidden_dim]
# hidden_dim → n_heads X head_dim 형태로 변형
# n_heads(h)개의 서로 다른 어텐션(attention) 컨셉을 학습하도록 유도
Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
# Q: [batch_size, n_heads, query_len, head_dim]
# K: [batch_size, n_heads, key_len, head_dim]
# V: [batch_size, n_heads, value_len, head_dim]
# Attention Energy 계산
energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale
# energy: [batch_size, n_heads, query_len, key_len]
# 마스크(mask)를 사용하는 경우
if mask is not None:
# 마스크(mask) 값이 0인 부분을 -1e10으로 채우기
energy = energy.masked_fill(mask==0, -1e10)
# 어텐션(attention) 스코어 계산: 각 단어에 대한 확률 값
attention = torch.softmax(energy, dim=-1)
# attention: [batch_size, n_heads, query_len, key_len]
# 여기에서 Scaled Dot-Product Attention을 계산
x = torch.matmul(self.dropout(attention), V)
# x: [batch_size, n_heads, query_len, head_dim]
x = x.permute(0, 2, 1, 3).contiguous()
# x: [batch_size, query_len, n_heads, head_dim]
x = x.view(batch_size, -1, self.hidden_dim)
# x: [batch_size, query_len, hidden_dim]
x = self.fc_o(x)
# x: [batch_size, query_len, hidden_dim]
return x, attention
class PositionwiseFeedforwardLayer(nn.Module):
def __init__(self, hidden_dim, pf_dim, dropout_ratio):
super().__init__()
self.fc_1 = nn.Linear(hidden_dim, pf_dim)
self.fc_2 = nn.Linear(pf_dim, hidden_dim)
self.dropout = nn.Dropout(dropout_ratio)
def forward(self, x):
# x: [batch_size, seq_len, hidden_dim]
x = self.dropout(torch.relu(self.fc_1(x)))
# x: [batch_size, seq_len, pf_dim]
x = self.fc_2(x)
# x: [batch_size, seq_len, hidden_dim]
return x
class EncoderLayer(nn.Module):
def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
super().__init__()
self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
self.ff_layer_norm = nn.LayerNorm(hidden_dim)
self.self_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
self.positionwise_feedforward = PositionwiseFeedforwardLayer(hidden_dim, pf_dim, dropout_ratio)
self.dropout = nn.Dropout(dropout_ratio)
# 하나의 임베딩이 복제되어 Query, Key, Value로 입력되는 방식
def forward(self, src, src_mask):
# src: [batch_size, src_len, hidden_dim]
# src_mask: [batch_size, src_len]
# self attention
# 필요한 경우 마스크(mask) 행렬을 이용하여 어텐션(attention)할 단어를 조절 가능
_src, _ = self.self_attention(src, src, src, src_mask)
# dropout, residual connection and layer norm
src = self.self_attn_layer_norm(src + self.dropout(_src))
# src: [batch_size, src_len, hidden_dim]
# position-wise feedforward
_src = self.positionwise_feedforward(src)
# dropout, residual and layer norm
src = self.ff_layer_norm(src + self.dropout(_src))
# src: [batch_size, src_len, hidden_dim]
return src
class Encoder(nn.Module):
def __init__(self, input_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length=100):
super().__init__()
self.device = device
self.tok_embedding = nn.Embedding(input_dim, hidden_dim)
self.pos_embedding = nn.Embedding(max_length, hidden_dim)
self.layers = nn.ModuleList([EncoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])
self.dropout = nn.Dropout(dropout_ratio)
self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)
def forward(self, src, src_mask):
# src: [batch_size, src_len]
# src_mask: [batch_size, src_len]
batch_size = src.shape[0]
src_len = src.shape[1]
pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
# pos: [batch_size, src_len]
# 소스 문장의 임베딩과 위치 임베딩을 더한 것을 사용
src = self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos))
# src: [batch_size, src_len, hidden_dim]
# 모든 인코더 레이어를 차례대로 거치면서 순전파(forward) 수행
for layer in self.layers:
src = layer(src, src_mask)
# src: [batch_size, src_len, hidden_dim]
return src # 마지막 레이어의 출력을 반환