[NLP] BERT에 CRF, BiLSTM, BiGRU 결합하는 방법

잉송·2022년 2월 14일
0

NLP

목록 보기
3/9
post-thumbnail
post-custom-banner

requirements

  • Python 3.8.3
  • transformers 4.16.2
  • torchcrf 0.0.1


KoBert + CRF

class KobertCRF(nn.Module):
    """ KoBERT with CRF """
    def __init__(self, bert, hidden_size=768, num_classes=num, dr_rate=0.3, params=None):
        super(KobertCRF, self).__init__()

        self.bert = bert

        self.dropout = nn.Dropout(dr_rate)
        self.position_wise_ff = nn.Linear(hidden_size, num_classes)
        self.crf = CRF(num_tags=num_classes, batch_first = True)
        
    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()
        
    def forward(self, input_ids, valid_length, token_type_ids=None, tags=None):
        attention_mask = self.gen_attention_mask(input_ids, valid_length)

        # outputs: (last_encoder_layer, pooled_output, attention_weight)
        outputs = self.bert(input_ids=input_ids,
                            token_type_ids=token_type_ids,
                            attention_mask=attention_mask.float().to(input_ids.device))
        last_encoder_layer = outputs[0]
        last_encoder_layer = self.dropout(last_encoder_layer)
        emissions = self.position_wise_ff(last_encoder_layer)

        if tags is not None:
            log_likelihood, sequence_of_tags = self.crf(emissions, tags), self.crf.decode(emissions)
            return log_likelihood, sequence_of_tags
        else:
            sequence_of_tags = self.crf.decode(emissions)
            return sequence_of_tags

KoElectra + CRF

class KoElectraCRF(nn.Module):
    """ KoBERT with CRF """
    def __init__(self, bert, hidden_size=768, num_classes=num, dr_rate=0.3, params=None):
        super(KoElectraCRF, self).__init__()

        self.bert = bert
        self._pad_id = tokenizer._pad_token_type_id

        self.dropout = nn.Dropout(dr_rate)
        self.position_wise_ff = nn.Linear(hidden_size, num_classes)
        self.crf = CRF(num_tags=num_classes, batch_first = True)
        
    def forward(self, input_ids, attention_mask, tags=None, using_pack_sequence=True):
        # outputs: (last_encoder_layer, pooled_output, attention_weight)
        outputs = self.bert(input_ids=input_ids,
                            attention_mask=attention_mask.float().to(input_ids.device))
        last_encoder_layer = outputs[0]
        last_encoder_layer = self.dropout(last_encoder_layer)
        emissions = self.position_wise_ff(last_encoder_layer)

        if tags is not None:
            log_likelihood, sequence_of_tags = self.crf(emissions, tags), self.crf.decode(emissions)
            return log_likelihood, sequence_of_tags
        else:
            sequence_of_tags = self.crf.decode(emissions)
            return sequence_of_tags


BiLSTM이나 BiGRU를 사용할때는 pack_padded_sequence와 pad_packed_sequence 함수를 사용한다. 나는 이부분에서 에러가 발생했다. 여기서 주의해야할 몇가지가 있다. pack_padded_sequence(Tensor, Tensor, batch_first=True, enforce_sorted=False)이 들어가야 하므로 기존의 소스코드인 seq_length가 아니라 seq_length.cpu()로 변경해야한다. 그리고 pad_packed_sequence에서는 코드에 없었던 total_length=max_len으로 지정해줘야한다. 필자의 경우 max_len이 256이라서 256라고 적었다. KoBERT와 Koelectra의 bilstm, bigru 부착법은 유사하므로 koelectra만 예시로 들겠다.

KoElectra + BiLSTM + CRF

class KoElectraCRF(nn.Module):
    """ KoBERT with CRF """
    def __init__(self, bert, hidden_size=768, num_classes=num, dr_rate=0.3, params=None):
        super(KoElectraCRF, self).__init__()

        self.bert = bert
        self._pad_id = tokenizer._pad_token_type_id

        self.dropout = nn.Dropout(dr_rate)
        self.bilstm = nn.LSTM(hidden_size, (hidden_size) // 2, dropout=dr_rate, batch_first=True, bidirectional=True)
        self.position_wise_ff = nn.Linear(hidden_size, num_classes)
        self.crf = CRF(num_tags=num_classes, batch_first = True)
        
    def forward(self, input_ids, attention_mask, tags=None, using_pack_sequence=True):
        # outputs: (last_encoder_layer, pooled_output, attention_weight)
        seq_length = input_ids.ne(self._pad_id).sum(dim=1)

        outputs = self.bert(input_ids=input_ids,
                            attention_mask=attention_mask.float().to(input_ids.device))
        last_encoder_layer = outputs[0]
        last_encoder_layer = self.dropout(last_encoder_layer)

        if using_pack_sequence is True:
            pack_padded_last_encoder_layer = pack_padded_sequence(last_encoder_layer, seq_length.cpu(), batch_first=True, enforce_sorted=False)
            outputs, hc = self.bilstm(pack_padded_last_encoder_layer)
            outputs = pad_packed_sequence(outputs, batch_first=True, padding_value=self._pad_id, total_length=256)[0]
        else:
            outputs, hc = self.bilstm(last_encoder_layer)
        emissions = self.position_wise_ff(outputs)

        if tags is not None:
            log_likelihood, sequence_of_tags = self.crf(emissions, tags), self.crf.decode(emissions)
            return log_likelihood, sequence_of_tags
        else:
            sequence_of_tags = self.crf.decode(emissions)
            return sequence_of_tags

KoElectra + BiLSTM + CRF

class KoElectraCRF(nn.Module):
    """ KoBERT with CRF """
    def __init__(self, bert, hidden_size=768, num_classes=num, dr_rate=0.3, params=None):
        super(KoElectraCRF, self).__init__()

        self.bert = bert
        self._pad_id = tokenizer._pad_token_type_id

        self.dropout = nn.Dropout(dr_rate)
        self.bigru = nn.GRU(hidden_size, (hidden_size) // 2, dropout=dr_rate, batch_first=True, bidirectional=True)
        self.position_wise_ff = nn.Linear(hidden_size, num_classes)
        self.crf = CRF(num_tags=num_classes, batch_first = True)
        
    def forward(self, input_ids, attention_mask, tags=None, using_pack_sequence=True):
        # outputs: (last_encoder_layer, pooled_output, attention_weight)
        seq_length = input_ids.ne(self._pad_id).sum(dim=1)

        outputs = self.bert(input_ids=input_ids,
                            attention_mask=attention_mask.float().to(input_ids.device))
        last_encoder_layer = outputs[0]
        last_encoder_layer = self.dropout(last_encoder_layer)

        if using_pack_sequence is True:
            pack_padded_last_encoder_layer = pack_padded_sequence(last_encoder_layer, seq_length.cpu(), batch_first=True, enforce_sorted=False)
            outputs, hc = self.bigru(pack_padded_last_encoder_layer)
            outputs = pad_packed_sequence(outputs, batch_first=True, padding_value=self._pad_id, total_length=256)[0]
        else:
            outputs, hc = self.bigru(last_encoder_layer)
        emissions = self.position_wise_ff(outputs)

        if tags is not None:
            log_likelihood, sequence_of_tags = self.crf(emissions, tags), self.crf.decode(emissions)
            return log_likelihood, sequence_of_tags
        else:
            sequence_of_tags = self.crf.decode(emissions)
            return sequence_of_tags
profile
NLP 공부하는 사람
post-custom-banner

0개의 댓글