class Encoder:
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
embed_W = (rn(V,D)/100).astype('f')
lstm_Wx = (rn(D, 4*H)/np.sqrt(D)).astype('f')
lstm_Wh = (rn(H, 4*H)/np.sqrt(H)).astpye('f')
lstm_b = np.zeros(4*H).astype('f')
self.embed = TimeEmbedding(embed_W)
self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=False)
self.params = self.embed.params + self.lstm.params
self.grads = self.embed.grads + self.lstm.grads
self.hs = None
def forward(self, xs):
xs = self.embed.forward(xs)
xs = self.lstm.forward(xs)
self.hs = hs
return hs[:,-1,:]
def backward(self, dh):
self.dhs = np.zeros_like(self.hs)
dhs[:,-1,:] = dh
dout = self.lstm.backward(dhs)
dout = self.embed.backward(dout)
return dout
class Decoder:
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
embed_W = (rn(V,D)/100).astype('f')
lstm_Wx = (rn(D, 4*H)/np.sqrt(D)).astype('f')
lstm_Wh = (rn(H, 4*H)/np.sqrt(H)).astpye('f')
lstm_b = np.zeros(4*H).astype('f')
affine_W = (rn(H,V)/np.sqrt(H)).astype('f')
affine_b = np.zeros(V).astype('f')
self.embed = TimeEmbedding(embed_W)
self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=False)
self.affine = TimeAffine(affine_W, affine_b)
self.params = self.embed.params + self.lstm.params + self.affine.params
self.grads = self.embed.grads + self.lstm.grads + self.affine.grads
def forward(self, xs, h):
self.lstm.set_state(h)
out = self.embed.forward(xs)
out = self.lstm.forward(out)
score = self.affine.forward(out)
return score
def backward(self, score):
dout = self.affine.backward(score)
dout = self.lstm.backward(dout)
dout = self.embed.backward(dout)
dh = self.lstm.dh
return dh
def generate(self, h, start_id, sample_size):
sampled = []
sample_id = start_id
self.lstm.set_state(h)
for _ in range(sample_size):
x = np.array(sample_id).reshape((1,1))
out = self.embed.forward(x)
out = self.lstm.forward(x)
score = self.affine.forward(x)
sample_id = np.argmax(score.flatten())
sampled.append(sample_id)
return sampled
class Seq2Seq(BaseModel):
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
self.encoder = Encoder(V,D,H)
self.decoder = Decoder(V,D,H)
self.softmax = TimeSoftmaxWithLoss()
self.params = self.encoder.params + self.decoder.params
self.grads = self.encoder.grads + self.decoder.grads
def forward(self, xs, ts):
decoder_xs, decoder_ts = ts[:, :-1], ts[:,1:]
h = self.encoder.forward(xs)
score = self.decoder.forward(decoder_xs,h)
loss = self.softmax.forward(score, decoder_xs)
return loss
def backward(self, dout=1):
dout = self.softmax.backward(dout)
dh = self.decoder.backward(xs)
dout = self.encoder.backward(dh)
return dout
def generate(self, xs, start_id, sample_size):
h = self.encoder.forward(xs)
sampled = self.decoder.generate(h, start_id, sample_size)
return sampled
개선 방법
class Decoder:
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
embed_W = (rn(V,D)/100).astype('f')
lstm_Wx = (rn(H+D, 4*H)/np.sqrt(D)).astype('f') # 변화
lstm_Wh = (rn(H, 4*H)/np.sqrt(H)).astpye('f')
lstm_b = np.zeros(4*H).astype('f')
affine_W = (rn(H+H,V)/np.sqrt(H)).astype('f') # 변화
affine_b = np.zeros(V).astype('f')
self.embed = TimeEmbedding(embed_W)
self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=False)
self.affine = TimeAffine(affine_W, affine_b)
self.params = self.embed.params + self.lstm.params + self.affine.params
self.grads = self.embed.grads + self.lstm.grads + self.affine.grads
def forward(self, xs, h):
N, T = xs.shape
N, H = h.shape
self.lstm.set_state(h)
out = self.embed.forward(xs)
hs = np.repeat(h,T, axis=0).reshape(N,T,H) # 변화
out = concatenate((hs,out), axis=2) # 변화
out = self.lstm.forward(out)
out = np.concatenate((hs, out), axis=2) # 변화
score = self.affine.forward(out)
self.cache = H
return score
def backward(self, score):
dout = self.affine.backward(score)
dout = self.lstm.backward(dout)
dout = self.embed.backward(dout)
dh = self.lstm.dh
return dh
def generate(self, h, start_id, sample_size):
sampled = []
sample_id = start_id
self.lstm.set_state(h)
for _ in range(sample_size):
x = np.array(sample_id).reshape((1,1))
out = self.embed.forward(x)
out = self.lstm.forward(x)
score = self.affine.forward(x)
sample_id = np.argmax(score.flatten())
sampled.append(sample_id)
return sampled
class WeightSum: # 맥락 벡터 구하는 class
def __init__(self):
self.params, self.grads = [], []
self.cache = None
def forward(self, hs, a):
N, T, H = hs.shape
ar = a.reshape(N,T,1).repeat(H, axis=2)
t = hs*ar
c = np.sum(t, axis=1)
self.cache = (hs, ar)
return c
def backward(self, dc):
hs, ar = self.cache
N, T, H = hs.shape
dt = dc.reshape(N,1,H).repeat(T, axis=1)
dar = dt*hs
dhs = dt*ar
da = np.sum(dar, axis=2)
return dhs, da
class AttentionWeight: # 가중치 구하는 class
def __init__(self):
self.params, self.grads = [], []
self.softmax = Softmax()
self.cache = None
def forward(self, hs, h):
N, T, H = hs.shape
hr = h.reshape(N,1,H).repeat(H, axis=1)
t = hs*hr
s = np.sum(t, axis=2)
a = self.softmax.forward(s)
self.cache = (hs,hr)
return a
def backward(self, da):
hs, hr = self.cache
N, T, H = hs.shape
ds = self.softmax.backward(da)
dt = ds.reshape(N,T,1).repeat(H, axis=2)
dhs = dt*hr
dhr = dt*hs
dh = np.sum(dhr, axis=1)
return dhs, dh
class Attention:
def __init(self):
self.params, self.grads = [], []
self.attention_weight_layer = AttentionWeight()
self.weight_sum_layer = WeightSum()
self.attention_weight = None
def forward(self, hs, h):
a = self.attention_weight_layer.forward(hs, h)
out = self.weight_sum_layer.forward(hs, a)
self.attention_weight = a
return out
def backward(self, dout):
dhs0, da = self.weight_sum_layer.backward(dout)
dhs1, dh = self.attention_weight_layer.backward(da)
dhs = dhs0 + dhs1
return dhs, dh
class TimeAttention:
def __init__(self):
self.params, self.grads = [], []
self.layers = None
self.attention_weights = None
def forward(self, hs_enc, hs_dec):
N,T,H = hs_dec.shape
out = np.empty_like(hs_dec)
self.layers = []
self.attention_weights = []
for t in range(T):
layer = Attention()
out[:, t, :] = layer.forward(hs_enc, hs_dec[:,t,:])
self.layers.append(layer)
self.attention_weights.append(layer.attention_weight)
return out
def backward(self, dout):
N,T,H = dout.shape
dhs_enc = 0
dhs_dec = np.empty_like(dout)
for t in range(T):
layer = self.layers[t]
dhs, dh = layer.backward(dout[:, t, :])
dhs_enc += dhs
dhs_dec[:,t,:] = dh
return dhs_enc, dhs_dec
class AttentionEncoder(Encoder):
def forward(self, xs):
xs = self.embed.forward(xs)
hs = self.lstm.forward(xs)
return hs
def backward(self, dhs):
dout = self.lstm.backward(dhs)
dout = self.embed.backward(dout)
return dout
class AttentionDecoder:
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
embed_W = (rn(V,D)/100).astype('f')
lstm_Wx = (rn(D, 4*H)/np.sqrt(D)).astype('f')
lstm_Wh = (rn(H, 4*H)/np.sqrt(H)).astpye('f')
lstm_b = np.zeros(4*H).astype('f')
affine_W = (rn(H,V)/np.sqrt(H)).astype('f')
affine_b = np.zeros(V).astype('f')
self.embed = TimeEmbedding(embed_W)
self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=False)
self.attention = TimeAttention()
self.affine = TimeAffine(affine_W, affine_b)
self.params = self.embed.params + self.lstm.params + self.attention.params + self.affine.params
self.grads = self.embed.grads + self.lstm.grads + self.attention.params + self.affine.grads
def forward(self, xs, h):
h = enc_hs[:, -1]
self.lstm.set_state(h)
out = self.embed.forward(xs)
dec_hs = self.lstm.forward(out)
c = self.attention.forward(enc_hs, dec_hs)
out = np.concatenate((c, dec_hs),axis=2)
score = self.affine.forward(out)
return score
def backward(self, dscore):
dout = self.affine.backward(dscore)
N, T, H2 = dout.shape
H = H2 // 2
dc, ddec_hs0 = dout[:,:,:H], dout[:,:,H:]
denc_hs, ddec_hs1 = self.attention.backward(dc)
ddec_hs = ddec_hs0 + ddec_hs1
dout = self.lstm.backward(ddec_hs)
dh = self.lstm.dh
denc_hs[:, -1] += dh
self.embed.backward(dout)
return denc_hs
def generate(self, enc_hs, start_id, sample_size):
sampled = []
sample_id = start_id
h = enc_hs[:, -1]
self.lstm.set_state(h)
for _ in range(sample_size):
x = np.array([sample_id]).reshape((1, 1))
out = self.embed.forward(x)
dec_hs = self.lstm.forward(out)
c = self.attention.forward(enc_hs, dec_hs)
out = np.concatenate((c, dec_hs), axis=2)
score = self.affine.forward(out)
sample_id = np.argmax(score.flatten())
sampled.append(sample_id)
return sampled
class AttentionSeq2seq(Seq2seq):
def __init__(self, vocab_size, wordvec_size, hidden_size):
args = vocab_size, wordvec_size, hidden_size
self.encoder = AttentionEncoder(*args)
self.decoder = AttentionDecoder(*args)
self.softmax = TimeSoftmaxWithLoss()
self.params = self.encoder.params + self.decoder.params
self.grads = self.encoder.grads + self.decoder.grads
def forward(self, xs, ts):
decoder_xs, decoder_ts = ts[:, :-1], ts[:,1:]
h = self.encoder.forward(xs)
score = self.decoder.forward(decoder_xs,h)
loss = self.softmax.forward(score, decoder_xs)
return loss
def backward(self, dout=1):
dout = self.softmax.backward(dout)
dh = self.decoder.backward(xs)
dout = self.encoder.backward(dh)
return dout
def generate(self, xs, start_id, sample_size):
h = self.encoder.forward(xs)
sampled = self.decoder.generate(h, start_id, sample_size)
return sampled
참고
밑바닥부터 시작하는 딥러닝2 (사이토 고키)