위 주제는 밑바닥부터 시작하는 딥러닝2 5강, CS224d를 바탕으로 작성한 글 입니다.
이전 글에서 RNN에 대해서 간단히 알아보는 시간을 가졌다.
이전글 보러가기 >> 순환 신경망 RNN(1)
오늘은 RNN을 직접 구현해보는 시간!
Time RNN 계층 : 순환 구조를 펼친 후의 계층들을 하나의 계층으로 간주한다.
우리가 구현 할 신경망은 가로 방향으로 펼친 신경망이다.
Time RNN 계층 내에서 한 단계의 작업을 수행하는 계층을 'RNN 계층'이라고 하고, T개 단계분의 작업을 한꺼번에 처리하는 계층을 'Time RNN 계층'이라고 한다.
시계열 데이터를 한꺼번에 처리하는 계층 앞에 'Time'을 붙이겠다. ex. Time Affine, Time Embedding
RNN의 순전파 식이다. (이전글에서 설명 함!)
여기에서 데이터를 미니배치로 모아서 처리한다.
xt
와 ht
에 각 샘플 데이터를 행 방향(N)에 저장한다.
N
: 미니배치 크기D
: 입력 벡터의 차원수H
: 은닉 상태 벡터의 차원수class RNN:
def __init__(self, Wx, Wh, b):
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.cacahe = None
def forward(self, x, h_prev):
Wx, Wh, b = self.params
t = np.matul(h_prev, Wh) + np.matmul(x, Wx) + b
h_next = np.tanh(t)
self.cache = (x, h_prev, h_next)
return h_next
__init__
params
변수에 2개의 가중치와 1개의 편향을 저장grads
변수에 각 매개변수에 대응하는 형태로 기울기를 초기화 한 후 저장한다.cache
는 역전파 계산 시 사용하는 중간 데이터를 담아야 하기 때문에 만든다. forward
역전파는 어떻게 될까?
그림처럼 손실함수 J에 대한 미분은 연쇄법칙을 사용하여 RNN으로 역전파 된다.
def backward(self, dh_next):
Wx, Wh, b = self.params
x, h_prev, h_next = self.cache
dt = dh_next * (1 - h_next ** 2)
db = np.sum(dt, axis=0)
dWh = np.matmul(h_prev.T, dt)
dh_prev = np.matmul(dt, Wh.T)
dwx = np.matmul(x.T, dt)
dx = np.matmul(dt, Wx.T)
self.grads[0][...] = dWx
self.grads[1][...] = dWh
self.grads[2][...] = dWb
return dx, dh_prev
backward
그림에서 나오는 식을 그대로 위의 코드로 넣었다.
tanh를 미분하면 1-tanh**2의 값이 나오는 것을 참고해서 직접 미분해보면 쉽게 알 수 있다.
Time RNN 구조를 배웠으니, Time RNN 계층은 은닉 상태를 인스턴스 변수 h로 보관하는 것을 눈치 챌 수 있다.
그러면 은닉 상태를 다음 블록에 인계할 수 있다.
RNN 계층의 은닉 상태를 Time RNN 계층에서 관리한다.
class TimeRNN:
def __init__(self, Wx, Wh, b, stateful=False):
def __init__(self, Wx, Wh, b, stateful=False):
self.params = [Wx, Wh, b]
self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
self.layers = None
self.h, self.dh = None, None
self.stateful = stateful
def set_state(self, h):
self.h = h
def reset_state(self):
self.h = None
__init__
가중치와 편향, stateful이라는 boolean 값을 인수로 받는다. stateful
은 은닉 상태를 인계받을지를 조정하기 위해 있는 변수이다. (True : 유지)
'유지한다'는 말은 해당 시간의 은닉 상태를 메모리에서 기억하고 있겠다는 의미이다.
긴 시계열 데이터를 처리할 때 RNN의 은닉 상태를 유지하며, Time RNN 계층의 순전파를 끊지 않고 전파한다는 의미가 내포되어 있다.
Flase인 경우는 은닉 상태를 '영행렬'(모든 요소가 0인 행렬)로 초기화한다. 이것이 상태가 없는 모드이며, '무상태'라고 한다.
layers
인스턴스에 RNN 계층을 리스트로 저장한다.h
는 forward()시 마지막 RNN 계층의 은닉 상태를 저장한다.dh
는 backward()를 불렀을 때 하나 앞 블록의 은닉 상태의 기울기를 저장한다.def forward(self, xs):
Wx, Wh, b = self.params
N, T, D = xs.shape
D, H = Wx.shape
self.layers = []
hs = np.empty((N, T, H), dtype='f')
if not self.stateful or self.h is None:
self.h = zp.zeros((N, H), dtype='f')
for t in range(T):
layer = RNN(*self.params)
self.h = layer.forward(xs[:, t, :], self.h)
hs[:, t, :] = self.h
self.layers.append(layer)
return hs
forward
매개변수 인자 xs
로 입력을 받는다 xs
는 T개 분량의 시계열 데이터를 하나로 모은 것이다.
N
: 미니배치 크기
T
: T개 분량 시계열 데이터를 하나로 모은 것
D
: 입력 벡터의 차원 수
h는 처음 호출 시 (self.h가 None일 때)에는 원소가 모두 0인 영행렬로 초기화 된다. 그리고 인스턴스 변수 stateful이 False일 때도 항상 영행렬로 초기화 한다.
hs
: 출력값을 담을 변수이다.
다음으로 for문 안에서 T번 반복하여 RNN 계층을 생성한다.
계층 생성과 동시에 forward()
로 h
를 계산하고 이를 hs
에 해당 인덱스(시각)의 값으로 설정한다.
h
에는 마지막 RNN 계층의 은닉 상태가 저장된다.
그래서 다음번 forward()
메서드 호출 시 stateful
이 True면 먼저 저장된 h값이 이용되고,
False면 다시 영행렬로 초기화 된다.
layer = RNN(*self.params)
을 보면 T에 대해서 같은 params을 넣는 것을 볼 수 있는데, RNN의 의미에서 알 수 있다.
잊지말자, RNN 계층들은 사실 하나의 계층이다.
시간적으로 표현하기 위해 가로로 두어 여러 계층처럼 보이지만, 본질적으로는 하나의 계층이다.
하지만 서로 다른 Time RNN 계층의 가중치 조건은 사실은 서로 다르다. (후에 설명)
def backward(self, dhs):
Wx, Wh, b = self.params
N, T, H = dhs.shape
D, H = Wx.shape
dxs = np.empty((N, T, D), dtype='f')
dh = 0
grads = [0, 0, 0]
for t in reversed(range(T)):
layer = self.layers[t]
dx, dh = layer.backward(dhs[:, t, :] + dh) # dht + dhnext 합산된 기울기
dxs[:, t, :] = dx
for i, grad in enumerate(layer.grads):
grads[i] += grad
for i, grad in enumerate(grads):
self.grads[i][...] = grad
self.dh = dh
return dxs
backward
이전과 다르게 Time RNN은 Truncated BPTT를 수행하기 때문에 이 블록의 이전 시각 역전파는 필요하지 않다.
하지만 후에 seq2seq에 필요하기 때문에 이전 은닉 상태 기울기 dh는 저장해 놓는다.
t번째 RNN 계층에서는 위로부터의 기울기 dht와 '한 시각 뒤(미래) 계층'으로부터의 기울기 dhnext가 전해진다.
RNN 계층 순전파에서 출력이 2개로 분기 되어, 역전파에서는 각 기울기가 합산되어 전해진다.
따라서 역전파 시 RNN 계층에는 합산된 기울기(dht + dhnext)가 입력된다.
dxs
: 역전파 후 하류로 흘려보낼 기울기를 담을 변수앞서 RNN 계층은 본질적으로 하나의 계층을 시간적으로 표현한 것이라고 했는데, 순전파에서는 가중치들의 조건은 같았지만 역전파에서는 시간마다 grad(기울기)들이 모두 다르다.
그 이유는 입력받은 미분(dht) 값들이 모두 다르기 때문이다. (순전파 입장에서는 Xs값들이 각 시각마다 다르기 떄문이다.)
grads
에 덮어쓴다.a= np.array([[1, 2], [3, 4]])
b= np.array([5, 6], [7, 8]])
c=[a,b]
print(c[0])
>>> [[1 2]
[3 4]]
grad[0, 0]
for i, grad in enumerate(c):
grads[i] += grad
print(grads)
>>> [array([[1 2],
[3 4]]), array([[5, 6], [7, 8])]
gradss = [np.zeros_like(a), np.zeros_like(b)]
print(gradss)
>>> [array([[0 0]
[0 0]]), array([[0, 0], [0, 0])]
for i, grad in enumerate(gards):
gradss[i][...]= grad
print(grads)
>>> [array([[1 2],
[3 4]]), array([[5, 6], [7, 8])]
RNN을 이용해서 '언어 모델'을 구현해보자.
지금까지 시계열 데이터를 한꺼번에 처리하는 RNN계층을 구현했는데, RNNLM(RNN Language Model)을 완성해보자.
첫 번째 Embedding 계층을 통해 ID를 단어의 분산 표현으로 변환한다.
그리고 그 분산 표현이 RNN 계층으로 입력된다.
RNN 계층은 은닉 상태를 다음 층으로 Affine 계층과 softmax계층으로 출력하며, 같은 출력을 다음 시각의 RNN 계층 쪽으로 출력한다.
softmax를 통해 나온 확률은 입력 다음에 나올 단어에 대한 확률들이 높은 것을 확인할 수 있다.
'say'부분에서는 'goodbye', 'hello' 둘 다 높게 나왔는데 둘 다 'you say' 다음으로 자연스럽게 나올 수 있다.
RNN 계층이 'you say'라는 맥락을 기억하고 있다는 것이다.
즉 과거의 정보를 응집된 은닉 상태 벡터로 저장해 두고 있다.
이처럼 RNNLM은 입력된 단어를 기억하고, 그것을 바탕으로 다음에 출현할 단어를 예측한다.
RNN 계층이 과거에서 현재로 데이터를 계속 흘려보내줌으로써 과거의 정보를 인코딩해 저장(기억)할 수 있는 것!!
Time Embedding, Time Affine 계층을 구현해보자.
다시 말하지만 Time Affine, Time Embedding은 Affine 계층과 Embedding 계층을 T개 준비해서, 각 시각의 데이터를 개별적으로 처리하면 된다.
Time Affine 계층은 Affine 계층을 T개 준비해서, 각 시각의 데이터를 개별적으로 처리한다.
class TimeAffine:
def __init__(self, W, b):
self.params = [W, b]
self.grads = [np.zeros_like(W), np.zeros_like(b)]
self.x = None
def forward(self, x):
N, T, D = x.shape
W, b = self.params
rx = x.reshape(N*T, -1)
out = np.dot(rx, W) + b
self.x = x
return out.reshape(N, T, -1)
def backward(self, dout):
x = self.x
N, T, D = x.shape
W, b = self.params
dout = dout.reshape(N*T, -1)
rx = x.reshape(N*T, -1)
db = np.sum(dout, axis=0)
dW = np.dot(rx.T, dout)
dx = np.dot(dout, W.T)
dx = dx.reshape(*x.shape)
self.grads[0][...] = dW
self.grads[1][...] = db
return dx
Softmax 계층을 구현할 때는 손실 오차를 구하는 Cross Entropy Error 계층도 함께 구현한다.
x
값들은 확률로 변환되기 전 점수고 t
값들은 정답 레이블이다.
위는 한 블럭 당 최종 손실이지(미니 배치에 해당하는 손실의 평균) 전체 데이터 손실은 아니다.
만약에 N개의 미니배치라면, N개의 미니배치 손실을 더하여 N으로 나누어 1개당 평균 손실을 최종손실로 구하여 최종 출력으로 내보낸다.
class TimeSoftmaxWithLoss:
def __init__(self):
self.params, self.grads = [], []
self.cache = None
self.ignore_label = -1
def forward(self, xs, ts):
N, T, V = xs.shape
if ts.ndim == 3: # 정답 레이블이 원핫 벡터인 경우
ts = ts.argmax(axis=2)
mask = (ts != self.ignore_label)
# 배치용과 시계열용을 정리(reshape)
xs = xs.reshape(N * T, V)
ts = ts.reshape(N * T)
mask = mask.reshape(N * T)
ys = softmax(xs)
ls = np.log(ys[np.arange(N * T), ts])
ls *= mask # ignore_label에 해당하는 데이터는 손실을 0으로 설정
loss = -np.sum(ls)
loss /= mask.sum()
self.cache = (ts, ys, mask, (N, T, V))
return loss
def backward(self, dout=1):
ts, ys, mask, (N, T, V) = self.cache
dx = ys
dx[np.arange(N * T), ts] -= 1
dx *= dout
dx /= mask.sum()
dx *= mask[:, np.newaxis] # ignore_label에 해당하는 데이터는 기울기를 0으로 설정
dx = dx.reshape((N, T, V))
return dx
클래스 4개의 Time 계층을 쌓은 신경망이다.
class SimpleRnnlm:
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size # wordvec_size는 입력벡터차원수(특정단어 분산표현)
rn = np.random.randn
# 가중치 초기화
embed_W = (rn(V, D) / 100).astype('f')
rnn_Wx = (rn(D, H) / np.sqrt(D)).astype('f')
rnn_Wh = (rn(H, H) / np.sqrt(H)).astype('f')
rnn_b = np.zeros(H).astype('f')
affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
affine_b = np.zeros(V).astype('f')
# 계층 생성
self.layers = [
TimeEmbedding(embed_W),
TimeRNN(rnn_Wx, rnn_Wh, rnn_b, stateful=True),
TimeAffine(affine_W, affine_b)
]
self.loss_layer = TimeSoftmaxWithLoss()
self.rnn_layer = self.layers[1]
# 모든 가중치와 기울기를 리스트에 모은다.
self.params, self.grads = [], []
for layer in self.layers:
self.params += layer.params
self.grads += layer.grads
__init__
각 계층에서 사용하는 매개변수를 초기화한다.
Time RNN 계층의 stateful을 True로 설정한다. (은닉 상태 인계)
RNN과 Affine 초기화를 보면 'Xavier 초깃값'을 이용해서 노드가 n개인 경우, np.sqrt(n)
으로 나누어 값들을 초기화 한다.
표준편차는 데이터의 차이를 직관적으로 나타내는 척도이다.
Forward와 Backward는 앞서 구축한 TimeRNN과 SoftmaxwithLoss의 코드를 사용하면 된다.
def forward(self, xs, ts):
for layer in self.layers:
xs = layer.forward(xs)
loss = self.loss_layer.forward(xs, ts)
return loss
def backward(self, dout=1):
dout = self.loss_layer.backward(dout)
for layer in reversed(self.layers):
dout = layer.backward(dout)
return dout
def reset_state(self):
self.rnn_layer.reset_state()
언어 모델의 '평가 방법'에 대해서 알아보자.
언어 모델은 주어진 과거 단어로부터 다음에 출현할 단어의 확률 분포를 출력한다.
이때 언어 모델의 예측 성능을 평가하는 척도로 퍼블렉서티(Perplexity)를 자주 사용한다.
간단하게 '확률의 역수'라고 할 수 있다.
예를 들어, 'you say goodbye and I say hello .' 에서
'you' 다음으로 'say'가 나올 확률이 0.8이라고 분포되어있고 정답이 'say'라면, 이때의 확률은 0.8이다.
퍼플렉서티는 이 확률의 역수 1/(0.8) = 1.25라고 할 수 있습니다.
만약 say가 나올 확률이 0.2라고 나왔다면 퍼플렉서티는 5일 것입니다.
퍼블렉시티는 작으수록 좋다는 것을 알 수 있다.
그렇다면 퍼블렉시티 값은 무슨 의미를 가지고 있는가? '분기수'(number of branches)로 해석할 수 있다.
즉, 예측한 분기수가 1.25이면 'you'라는 단어 다음 출현할 후보가 1개로 좁혀졌다는 이야기이다.
분기수가 5이면, 다음 출현할 후보가 5개라는 의미이다.
찍기전의 시험지의 5지선다 같은 느낌이다.
지금은 입력데이터가 하나일 때의 이야기이고 여러 개일 때는 다음 공식을 따른다.
N
: 데이터의 총 개수
tn
: 원핫 벡터로 나타낸 정답 레이블
tnk
: n 개째 데이터의 k번째 값을 의미
ynk
: 확률분포(신경망에서는 softmax의 출력)
눈치챈 사람도 있겠지만, 위의 L값은 교차 엔트로피 오차식과 완전히 같다.
퍼블렉서티는 L값을 이용해 exp 계산 해준 값이다.
# 하이퍼파라미터 설정
batch_size = 10
wordvec_size = 100
hidden_size = 100 # RNN의 은닉 상태 벡터의 원소 수
time_size = 5 # Truncated BPTT가 한 번에 펼치는 시간 크기
lr = 0.1
max_epoch = 100
# 학습 데이터 읽기(전체 중 1000개만)
corpus, word_to_id, id_to_word = ptb.load_data('train')
corpus_size = 1000
corpus = corpus[:corpus_size]
vocab_size = int(max(corpus) + 1) # 단어종류, corpus는 0부터 매겨질 것이므로 +1을 해주는 것임
xs = corpus[:-1] # 입력 (맨 마지막 값빼고를 의미)
ts = corpus[1:] # 출력(정답 레이블) ( 맨 첫번째 값 빼고를 의미 : 정답이라는 것은 그 다음 올 단어이므로)
data_size = len(xs)
print('말뭉치 크기: %d, 어휘 수: %d' % (corpus_size, vocab_size))
# 학습 시 사용하는 변수
max_iters = data_size // (batch_size * time_size)
time_idx = 0
total_loss = 0
loss_count = 0
ppl_list = []
# 모델 생성
model = SimpleRnnlm(vocab_size, wordvec_size, hidden_size)
optimizer = SGD(lr)
# 1. 각 미니배치의 각 샘플의 읽기 시작 위치를 계산
jump = (corpus_size - 1) // batch_size
offsets = [i * jump for i in range(batch_size)]
for epoch in range(max_epoch):
for iter in range(max_iters):
# 2. 미니배치 취득
batch_x = np.empty((batch_size, time_size), dtype='i')
batch_t = np.empty((batch_size, time_size), dtype='i')
for t in range(time_size):
for i, offset in enumerate(offsets):
batch_x[i, t] = xs[(offset + time_idx) % data_size]
batch_t[i, t] = ts[(offset + time_idx) % data_size]
time_idx += 1
# 기울기를 구하여 매개변수 갱신
loss = model.forward(batch_x, batch_t)
model.backward()
optimizer.update(model.params, model.grads)
total_loss += loss
loss_count += 1
# 3. 에폭마다 퍼플렉서티 평가
ppl = np.exp(total_loss / loss_count)
print('| 에폭 %d | 퍼플렉서티 %.2f'
% (epoch+1, ppl))
ppl_list.append(float(ppl))
total_loss, loss_count = 0, 0
학습을 진행할수록 퍼블렉시티가 낮아지는 것을 확인할 수 있다.
500으로 시작하여 5까지 떨어지며, 후보군이 평균 500에서 시작하여 5까지 내려갔다는 의미이다.
위의 코드에 대한 설명은 생략하겠다. 하지만 학습 코드는 기존의 신경망 학습과 거의 같다.
하지만 '데이터 제공 방법'과 '퍼블렉서티 계산' 부분에서 차이가 있는데,
코드에서 나온 숫자들에 해당하는 코드들을 생각해보는 시간을 가졌으면 한다.