class KobertCRF(nn.Module):
""" KoBERT with CRF """
def __init__(self, bert, hidden_size=768, num_classes=num, dr_rate=0.3, params=None):
super(KobertCRF, self).__init__()
self.bert = bert
self.dropout = nn.Dropout(dr_rate)
self.position_wise_ff = nn.Linear(hidden_size, num_classes)
self.crf = CRF(num_tags=num_classes, batch_first = True)
def gen_attention_mask(self, token_ids, valid_length):
attention_mask = torch.zeros_like(token_ids)
for i, v in enumerate(valid_length):
attention_mask[i][:v] = 1
return attention_mask.float()
def forward(self, input_ids, valid_length, token_type_ids=None, tags=None):
attention_mask = self.gen_attention_mask(input_ids, valid_length)
# outputs: (last_encoder_layer, pooled_output, attention_weight)
outputs = self.bert(input_ids=input_ids,
token_type_ids=token_type_ids,
attention_mask=attention_mask.float().to(input_ids.device))
last_encoder_layer = outputs[0]
last_encoder_layer = self.dropout(last_encoder_layer)
emissions = self.position_wise_ff(last_encoder_layer)
if tags is not None:
log_likelihood, sequence_of_tags = self.crf(emissions, tags), self.crf.decode(emissions)
return log_likelihood, sequence_of_tags
else:
sequence_of_tags = self.crf.decode(emissions)
return sequence_of_tags
class KoElectraCRF(nn.Module):
""" KoBERT with CRF """
def __init__(self, bert, hidden_size=768, num_classes=num, dr_rate=0.3, params=None):
super(KoElectraCRF, self).__init__()
self.bert = bert
self._pad_id = tokenizer._pad_token_type_id
self.dropout = nn.Dropout(dr_rate)
self.position_wise_ff = nn.Linear(hidden_size, num_classes)
self.crf = CRF(num_tags=num_classes, batch_first = True)
def forward(self, input_ids, attention_mask, tags=None, using_pack_sequence=True):
# outputs: (last_encoder_layer, pooled_output, attention_weight)
outputs = self.bert(input_ids=input_ids,
attention_mask=attention_mask.float().to(input_ids.device))
last_encoder_layer = outputs[0]
last_encoder_layer = self.dropout(last_encoder_layer)
emissions = self.position_wise_ff(last_encoder_layer)
if tags is not None:
log_likelihood, sequence_of_tags = self.crf(emissions, tags), self.crf.decode(emissions)
return log_likelihood, sequence_of_tags
else:
sequence_of_tags = self.crf.decode(emissions)
return sequence_of_tags
BiLSTM이나 BiGRU를 사용할때는 pack_padded_sequence와 pad_packed_sequence 함수를 사용한다. 나는 이부분에서 에러가 발생했다. 여기서 주의해야할 몇가지가 있다. pack_padded_sequence(Tensor, Tensor, batch_first=True, enforce_sorted=False)이 들어가야 하므로 기존의 소스코드인 seq_length가 아니라 seq_length.cpu()로 변경해야한다. 그리고 pad_packed_sequence에서는 코드에 없었던 total_length=max_len으로 지정해줘야한다. 필자의 경우 max_len이 256이라서 256라고 적었다. KoBERT와 Koelectra의 bilstm, bigru 부착법은 유사하므로 koelectra만 예시로 들겠다.
class KoElectraCRF(nn.Module):
""" KoBERT with CRF """
def __init__(self, bert, hidden_size=768, num_classes=num, dr_rate=0.3, params=None):
super(KoElectraCRF, self).__init__()
self.bert = bert
self._pad_id = tokenizer._pad_token_type_id
self.dropout = nn.Dropout(dr_rate)
self.bilstm = nn.LSTM(hidden_size, (hidden_size) // 2, dropout=dr_rate, batch_first=True, bidirectional=True)
self.position_wise_ff = nn.Linear(hidden_size, num_classes)
self.crf = CRF(num_tags=num_classes, batch_first = True)
def forward(self, input_ids, attention_mask, tags=None, using_pack_sequence=True):
# outputs: (last_encoder_layer, pooled_output, attention_weight)
seq_length = input_ids.ne(self._pad_id).sum(dim=1)
outputs = self.bert(input_ids=input_ids,
attention_mask=attention_mask.float().to(input_ids.device))
last_encoder_layer = outputs[0]
last_encoder_layer = self.dropout(last_encoder_layer)
if using_pack_sequence is True:
pack_padded_last_encoder_layer = pack_padded_sequence(last_encoder_layer, seq_length.cpu(), batch_first=True, enforce_sorted=False)
outputs, hc = self.bilstm(pack_padded_last_encoder_layer)
outputs = pad_packed_sequence(outputs, batch_first=True, padding_value=self._pad_id, total_length=256)[0]
else:
outputs, hc = self.bilstm(last_encoder_layer)
emissions = self.position_wise_ff(outputs)
if tags is not None:
log_likelihood, sequence_of_tags = self.crf(emissions, tags), self.crf.decode(emissions)
return log_likelihood, sequence_of_tags
else:
sequence_of_tags = self.crf.decode(emissions)
return sequence_of_tags
class KoElectraCRF(nn.Module):
""" KoBERT with CRF """
def __init__(self, bert, hidden_size=768, num_classes=num, dr_rate=0.3, params=None):
super(KoElectraCRF, self).__init__()
self.bert = bert
self._pad_id = tokenizer._pad_token_type_id
self.dropout = nn.Dropout(dr_rate)
self.bigru = nn.GRU(hidden_size, (hidden_size) // 2, dropout=dr_rate, batch_first=True, bidirectional=True)
self.position_wise_ff = nn.Linear(hidden_size, num_classes)
self.crf = CRF(num_tags=num_classes, batch_first = True)
def forward(self, input_ids, attention_mask, tags=None, using_pack_sequence=True):
# outputs: (last_encoder_layer, pooled_output, attention_weight)
seq_length = input_ids.ne(self._pad_id).sum(dim=1)
outputs = self.bert(input_ids=input_ids,
attention_mask=attention_mask.float().to(input_ids.device))
last_encoder_layer = outputs[0]
last_encoder_layer = self.dropout(last_encoder_layer)
if using_pack_sequence is True:
pack_padded_last_encoder_layer = pack_padded_sequence(last_encoder_layer, seq_length.cpu(), batch_first=True, enforce_sorted=False)
outputs, hc = self.bigru(pack_padded_last_encoder_layer)
outputs = pad_packed_sequence(outputs, batch_first=True, padding_value=self._pad_id, total_length=256)[0]
else:
outputs, hc = self.bigru(last_encoder_layer)
emissions = self.position_wise_ff(outputs)
if tags is not None:
log_likelihood, sequence_of_tags = self.crf(emissions, tags), self.crf.decode(emissions)
return log_likelihood, sequence_of_tags
else:
sequence_of_tags = self.crf.decode(emissions)
return sequence_of_tags