KLUE & RE 대회를 진행하며 Baseline으로 받은 코드가 모듈화가 안되어있어 모듈화를 진행하는겸 Pytorch Lightning으로 이식하였습니다.
model.py
model.py
from importlib import import_module
import numpy as np
import pytorch_lightning as pl
import torch
import transformers
from utils import criterion_entrypoint, klue_re_auprc, klue_re_micro_f1, n_compute_metrics
class Model(pl.LightningModule):
def __init__(self, config):
super().__init__()
self.save_hyperparameters()
# 설정들을 OmegaConf를 통해 불러옵니다.
self.model_name = config.model.model_name
self.lr = config.train.learning_rate
self.lr_sch_use = config.train.lr_sch_use
self.lr_decay_step = config.train.lr_decay_step
self.scheduler_name = config.train.scheduler_name
self.lr_weight_decay = config.train.lr_weight_decay
# 사용할 모델을 호출합니다.
self.plm = transformers.AutoModelForSequenceClassification.from_pretrained(
pretrained_model_name_or_path=self.model_name, num_labels=30
)
# Loss 계산을 위해 사용될 Loss를 호출합니다.
self.loss_func = criterion_entrypoint(config.train.loss_name)
# Optimizer를 config를 통해 불러옵니다.
self.optimizer_name = config.train.optimizer_name
def forward(self, x):
# Huggingface의 Github에서 모델의 input을 찾을 수 있습니다.
x = self.plm(
input_ids=x["input_ids"],
attention_mask=x["attention_mask"],
token_type_ids=x["token_type_ids"],
)
return x["logits"] # Huggingface의 Github에서 모델의 output을 찾을 수 있습니다.
def training_step(self, batch, batch_idx):
# trainer.train() 과정을 정의한다.
# Dataset의 __getitem__의 return이 batch로 들어옵니다.
x = batch
y = batch["labels"]
logits = self(x) # foward
loss = self.loss_func(logits, y.long()) # foward를 통해 나온 logits과 label
f1, accuracy = n_compute_metrics(logits, y).values()
self.log("train", {"loss": loss, "f1": f1, "accuracy": accuracy})
return loss # training_step이 끝날 때 return
def validation_step(self, batch, batch_idx):
# trainer.train() 중 validation 과정을 정의한다.
x = batch
y = batch["labels"]
logits = self(x)
loss = self.loss_func(logits, y.long())
f1, accuracy = n_compute_metrics(logits, y).values()
self.log("val_loss", loss)
self.log("val_accuracy", accuracy)
self.log("val_f1", f1, on_step=True)
return {"logits": logits, "y": y} # logits값과 y를 validation_step 종료시 return
def validation_epoch_end(self, outputs):
# outputs is an array with what you returned in validation_step for each batch
# outputs = (batch_size,steps)
# outputs = [{'loss': batch_0_loss, 'y': batch_0_y}, ... , {'loss': batch_n_loss, 'y': batch_0_y}]
# outputs에서 logits, y을 각각 뽑아 torch.cat을 통해 붙여준다.
logits = torch.cat([x["logits"] for x in outputs])
y = torch.cat([x["y"] for x in outputs])
# 함수에 보내기 위해 CPU로 이동시켜준다.
logits = logits.detach().cpu().numpy()
y = y.detach().cpu()
auprc = klue_re_auprc(logits, y)
self.log("val_auprc", auprc)
def test_step(self, batch, batch_idx):
# trainer.test() 과정을 정의한다.
x = batch
y = batch["labels"]
logits = self(x)
f1, accuracy = n_compute_metrics(logits, y).values()
self.log("test_f1", f1)
return {"logits": logits, "y": y}
def test_epoch_end(self, outputs):
logits = torch.cat([x["logits"] for x in outputs])
y = torch.cat([x["y"] for x in outputs])
logits = logits.detach().cpu().numpy()
y = y.detach().cpu()
auprc = klue_re_auprc(logits, y)
self.log("test_auprc", auprc)
def predict_step(self, batch, batch_idx):
logits = self(batch)
return logits # (batch,steps) size tensor
def configure_optimizers(self):
opt_module = getattr(import_module("torch.optim"), self.optimizer_name)
if self.lr_weight_decay:
optimizer = opt_module(filter(lambda p: p.requires_grad, self.parameters()), lr=self.lr, weight_decay=0.01)
else:
optimizer = opt_module(
filter(lambda p: p.requires_grad, self.parameters()),
lr=self.lr,
# weight_decay=5e-4
)
if self.lr_sch_use:
t_total = # train_dataloader len * epochs
warmup_step = int(t_total * 0.1)
_scheduler_dic = {
"StepLR": torch.optim.lr_scheduler.StepLR(optimizer, self.lr_decay_step, gamma=0.5),
"ReduceLROnPlateau": torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.1, patience=10),
"CosineAnnealingLR": torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=2, eta_min=0.0),
"constant_warmup": transformers.get_constant_schedule_with_warmup(optimizer, 100),
"cosine_warmup": transformers.get_cosine_schedule_with_warmup(
optimizer, num_warmup_steps=warmup_step, num_training_steps=t_total
),
}
scheduler = _scheduler_dic[self.scheduler_name]
return [optimizer], [scheduler]
else:
return optimizer
data.py
data.py
import os
import pickle as pickle
import pandas as pd
import pytorch_lightning as pl
import torch
import transformers
from tqdm.auto import tqdm
from utils import *
class Dataset(torch.utils.data.Dataset):
"""Dataset 구성을 위한 Class"""
def __init__(self, pair_dataset, labels):
self.pair_dataset = pair_dataset
self.labels = labels
def __getitem__(self, idx):
item = {key: val[idx].clone().detach() for key, val in self.pair_dataset.items()}
item["labels"] = torch.tensor(self.labels[idx])
return item
def __len__(self):
return len(self.labels)
class Dataloader(pl.LightningDataModule):
def __init__(self, model_name, batch_size, shuffle, train_path, test_path, split_seed=42):
super().__init__()
self.model_name = model_name
self.batch_size = batch_size
self.shuffle = shuffle
self.split_seed = split_seed
self.train_path = train_path
self.test_path = test_path
self.train_dataset = None
self.val_dataset = None
self.test_dataset = None
self.predict_dataset = None
self.tokenizer = transformers.AutoTokenizer.from_pretrained(model_name, max_length=200)
def setup(self, stage="fit"):
if stage == "fit":
# 학습 데이터을 호출
total_data = load_data(self.train_path)
# 9:1 비율로 train, validation을 분리합니다.
train_data = total_data.sample(frac=0.9, random_state=self.split_seed)
val_data = total_data.drop(train_data.index)
train_label = label_to_num(train_data["label"].values)
val_label = label_to_num(val_data["label"].values)
tokenized_train = tokenized_dataset(train_data, self.tokenizer)
tokenized_val = tokenized_dataset(val_data, self.tokenizer)
self.train_dataset = Dataset(tokenized_train, train_label)
self.val_dataset = Dataset(tokenized_val, val_label)
if stage == "test":
total_data = load_data(self.train_path)
train_data = total_data.sample(frac=0.9, random_state=self.split_seed)
val_data = total_data.drop(train_data.index)
val_label = label_to_num(val_data["label"].values)
tokenized_val = tokenized_dataset(val_data, self.tokenizer)
self.test_dataset = Dataset(tokenized_val, val_label)
if stage == "predict":
p_data = load_data(self.test_path)
p_label = list(map(int, p_data["label"].values))
tokenized_p = tokenized_dataset(p_data, self.tokenizer)
self.predict_dataset = Dataset(tokenized_p, p_label)
def train_dataloader(self):
return torch.utils.data.DataLoader(
self.train_dataset,
batch_size=self.batch_size,
shuffle=self.shuffle,
num_workers=4,
)
def val_dataloader(self):
return torch.utils.data.DataLoader(self.val_dataset, batch_size=self.batch_size, num_workers=4)
def test_dataloader(self):
return torch.utils.data.DataLoader(self.test_dataset, batch_size=self.batch_size, num_workers=4)
def predict_dataloader(self):
return torch.utils.data.DataLoader(self.predict_dataset, batch_size=self.batch_size, num_workers=4)
main.py
main.py
import argparse
import os
import re
from datetime import datetime, timedelta
import torch
import wandb
from data_n import *
from model import *
from omegaconf import OmegaConf
from pytorch_lightning.callbacks import ModelCheckpoint, RichProgressBar
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning.loggers import WandbLogger
time_ = datetime.now() + timedelta(hours=9)
time_now = time_.strftime("%m%d%H%M")
wandb_dict = {
"users": "key"
}
if __name__ == "__main__":
# 하이퍼 파라미터 등 각종 설정값을 입력받습니다
# 터미널 실행 예시 : python3 run.py --batch_size=64 ...
# 실행 시 '--batch_size=64' 같은 인자를 입력하지 않으면 default 값이 기본으로 실행됩니다
parser = argparse.ArgumentParser()
parser.add_argument("--config", type=str, default="base_config")
args, _ = parser.parse_known_args()
cfg = OmegaConf.load(f"/opt/ml/code/pl/config/{args.config}.yaml")
wandb.login(key=wandb_dict[cfg.wandb.wandb_username])
model_name_ch = re.sub("/", "_", cfg.model.model_name)
wandb_logger = WandbLogger(
log_model="all",
name=f"{cfg.model.saved_name}_{cfg.train.batch_size}_{cfg.train.learning_rate}_{time_now}",
project=cfg.wandb.wandb_project,
entity=cfg.wandb.wandb_entity,
)
"""
By setting workers=True in seed_everything(),
Lightning derives unique seeds across all dataloader workers and processes
for torch, numpy and stdlib random number generators.
When turned on, it ensures that e.g. data augmentations are not repeated across workers.
"""
pl.seed_everything(cfg.train.seed, workers=True) # seed를 고정합니다.
ck_dir_path = f"/opt/ml/code/pl/checkpoint/{model_name_ch}"
if not os.path.exists(ck_dir_path):
os.makedirs(ck_dir_path)
# Checkpoint
checkpoint_callback = ModelCheckpoint(
dirpath=ck_dir_path, filename="{epoch}_{val_loss:.4f}", monitor="val_f1", save_top_k=1, mode="max"
)
# Earlystopping
earlystopping = EarlyStopping(monitor="val_f1", patience=3, mode="max")
# dataloader와 model을 생성합니다.
dataloader = Dataloader(
cfg.model.model_name,
cfg.train.batch_size,
cfg.data.shuffle,
cfg.path.train_path,
cfg.path.test_path,
cfg.train.seed,
)
model = Model(cfg)
"""
To ensure full reproducibility from run to run you need to set seeds for
pseudo-random generators, and set deterministic flag in Trainer.
"""
# gpu가 없으면 'gpus=0'을, gpu가 여러개면 'gpus=4'처럼 사용하실 gpu의 개수를 입력해주세요
trainer = pl.Trainer(
precision=16, # Mixed precision(FP16)
accelerator="gpu",
devices=1,
max_epochs=cfg.train.max_epoch,
log_every_n_steps=cfg.train.logging_step,
logger=wandb_logger, # W&B integration
# RichProgressBar()를 통해 예쁜 아웃풋을 보여줍니다.
callbacks=[earlystopping, checkpoint_callback, RichProgressBar()],
deterministic=True,
# limit_train_batches=0.15, # use only 15% of training data
# limit_val_batches = 0.01, # use only 1% of val data
# limit_train_batches=10 # use only 10 batches of training data
)
trainer.fit(model=model, datamodule=dataloader)
# test시 ckpt 중 가장 성능이 좋은 ckpt를 가지고 옵니다.
trainer.test(model=model, datamodule=dataloader, ckpt_path="best")
# 학습이 완료된 모델을 저장합니다.
output_dir_path = "output"
if not os.path.exists(output_dir_path):
os.makedirs(output_dir_path)
output_path = os.path.join(output_dir_path, f"{model_name_ch}_{time_now}_model.pt")
torch.save(model.state_dict(), output_path)
pl.seed_everything
과 deterministic
을 사용하여 재현을 보장하였습니다.precision
을 사용하여 Mixed precision 기능을 추가하였습니다.limit_train_batches
을 사용하여 빠른 테스트를 지원하였습니다.Stratified KFold
Stratified KFold (main)
results = []
for k in range(cfg.train.nums_folds):
model = Model(cfg)
datamodule = Dataloader(
cfg.model.model_name,
cfg.train.batch_size,
cfg.data.shuffle,
cfg.path.train_path,
cfg.path.test_path,
k=k,
split_seed=cfg.train.seed,
num_splits=cfg.train.nums_folds,
)
trainer = pl.Trainer(
precision=16,
accelerator="gpu",
devices=1,
max_epochs=cfg.train.max_epoch,
log_every_n_steps=cfg.train.logging_step,
logger=wandb_logger,
callbacks=[checkpoint_callback, earlystopping, RichProgressBar()],
deterministic=True,
# limit_train_batches=0.05,
)
trainer.fit(model=model, datamodule=datamodule)
score = trainer.test(model=model, datamodule=datamodule, ckpt_path="best")
results.extend(score)
# Fold 적용 결과 확인
show_result(results)
Stratified KFold (data)
def setup(self, stage="fit"):
if stage == "fit":
# StratifiedKFold
kf = StratifiedKFold(
n_splits=self.num_splits,
shuffle=True,
random_state=self.split_seed,
)
# 학습 데이터을 호출
total_data = load_data(self.train_path)
total_label = label_to_num(total_data["label"].values)
tokenized_total = tokenized_dataset(total_data, self.tokenizer)
total_dataset = Dataset(tokenized_total, total_label)
all_splits = [k for k in kf.split(total_dataset, total_label)]
# k번째 Fold Dataset 선택
train_indexes, val_indexes = all_splits[self.k]
train_indexes, val_indexes = train_indexes.tolist(), val_indexes.tolist()
# fold한 index에 따라 데이터셋 분할
self.train_dataset = [total_dataset[x] for x in train_indexes]
self.val_dataset = [total_dataset[x] for x in val_indexes]
if stage == "test":
total_data = load_data(self.train_path)
train_data = total_data.sample(frac=0.9, random_state=self.split_seed)
val_data = total_data.drop(train_data.index)
val_label = label_to_num(val_data["label"].values)
tokenized_val = tokenized_dataset(val_data, self.tokenizer)
self.test_dataset = Dataset(tokenized_val, val_label)
if stage == "predict":
p_data = load_data(self.test_path)
p_label = list(map(int, p_data["label"].values))
tokenized_p = tokenized_dataset(p_data, self.tokenizer)
self.predict_dataset = Dataset(tokenized_p, p_label)
R-ROBERTa
R-ROBERTa
class FCLayer(pl.LightningModule):
def __init__(self, input_dim, output_dim, dropout_rate=0.0, use_activation=True):
super().__init__()
self.save_hyperparameters()
self.use_activation = use_activation
self.dropout = torch.nn.Dropout(dropout_rate)
self.linear = torch.nn.Linear(input_dim, output_dim)
self.tanh = torch.nn.Tanh()
torch.nn.init.xavier_uniform_(self.linear.weight)
def forward(self, x):
x = self.dropout(x)
if self.use_activation:
x = self.tanh(x)
return self.linear(x)
class Model(pl.LightningModule):
def __init__(self, config):
super().__init__()
self.save_hyperparameters()
self.model_name = config.model.model_name
self.lr = config.train.learning_rate
self.lr_sch_use = config.train.lr_sch_use
self.lr_decay_step = config.train.lr_decay_step
self.scheduler_name = config.train.scheduler_name
self.lr_weight_decay = config.train.lr_weight_decay
self.dr_rate = 0
self.hidden_size = 1024
self.num_classes = 30
# 사용할 모델을 호출합니다.
self.plm = transformers.RobertaModel.from_pretrained(self.model_name, add_pooling_layer=False)
self.cls_fc = FCLayer(self.hidden_size, self.hidden_size // 2, self.dr_rate)
self.sentence_fc = FCLayer(self.hidden_size, self.hidden_size // 2, self.dr_rate)
self.label_classifier = FCLayer(self.hidden_size // 2 * 3, self.num_classes, self.dr_rate, False)
# Loss 계산을 위해 사용될 CE Loss를 호출합니다.
self.loss_func = criterion_entrypoint(config.train.loss_name)
self.optimizer_name = config.train.optimizer_name
def forward(self, x):
out = self.plm(
input_ids=x["input_ids"],
attention_mask=x["attention_mask"],
token_type_ids=x["token_type_ids"],
)[0]
sentence_end_position = torch.where(x["input_ids"] == 2)[1]
sent1_end, sent2_end = sentence_end_position[0], sentence_end_position[1]
cls_vector = out[:, 0, :] # take <s> token (equiv. to [CLS])
prem_vector = out[:, 1:sent1_end] # Get Premise vector
hypo_vector = out[:, sent1_end + 1 : sent2_end] # Get Hypothesis vector
prem_vector = torch.mean(prem_vector, dim=1) # Average
hypo_vector = torch.mean(hypo_vector, dim=1)
# Dropout -> tanh -> fc_layer (Share FC layer for premise and hypothesis)
cls_embedding = self.cls_fc(cls_vector)
prem_embedding = self.sentence_fc(prem_vector)
hypo_embedding = self.sentence_fc(hypo_vector)
# Concat -> fc_layer
concat_embedding = torch.cat([cls_embedding, prem_embedding, hypo_embedding], dim=-1)
return self.label_classifier(concat_embedding)
utils
utils.py
import pickle
import numpy as np
import pandas as pd
import sklearn
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from tqdm.auto import tqdm
def preprocessing_dataset(dataset):
"""처음 불러온 csv 파일을 원하는 형태의 DataFrame으로 변경 시켜줍니다."""
subject_entity = []
object_entity = []
for i, j in tqdm(zip(dataset["subject_entity"], dataset["object_entity"]), desc="preprocessing"):
i = i[1:-1].split(",")[0].split(":")[1]
j = j[1:-1].split(",")[0].split(":")[1]
subject_entity.append(i)
object_entity.append(j)
out_dataset = pd.DataFrame(
{
"id": dataset["id"],
"sentence": dataset["sentence"],
"subject_entity": subject_entity,
"object_entity": object_entity,
"label": dataset["label"],
}
)
return out_dataset
def tokenized_dataset(dataset, tokenizer):
"""tokenizer에 따라 sentence를 tokenizing 합니다."""
concat_entity = []
for e01, e02 in tqdm(zip(dataset["subject_entity"], dataset["object_entity"]), desc="tokenizing"):
temp = ""
temp = e01 + "[SEP]" + e02
concat_entity.append(temp)
tokenized_sentences = tokenizer(
concat_entity,
list(dataset["sentence"]),
return_tensors="pt",
padding=True,
truncation=True,
max_length=256,
add_special_tokens=True,
)
return tokenized_sentences
def label_to_num(label):
num_label = []
with open("/opt/ml/code/dict_label_to_num.pkl", "rb") as f:
dict_label_to_num = pickle.load(f)
for v in label:
num_label.append(dict_label_to_num[v])
return num_label
def klue_re_micro_f1(preds, labels):
"""KLUE-RE micro f1 (except no_relation)"""
label_list = [
"no_relation",
"org:top_members/employees",
"org:members",
"org:product",
"per:title",
"org:alternate_names",
"per:employee_of",
"org:place_of_headquarters",
"per:product",
"org:number_of_employees/members",
"per:children",
"per:place_of_residence",
"per:alternate_names",
"per:other_family",
"per:colleagues",
"per:origin",
"per:siblings",
"per:spouse",
"org:founded",
"org:political/religious_affiliation",
"org:member_of",
"per:parents",
"org:dissolved",
"per:schools_attended",
"per:date_of_death",
"per:date_of_birth",
"per:place_of_birth",
"per:place_of_death",
"org:founded_by",
"per:religion",
]
no_relation_label_idx = label_list.index("no_relation")
label_indices = list(range(len(label_list)))
label_indices.remove(no_relation_label_idx)
return sklearn.metrics.f1_score(labels, preds, average="micro", labels=label_indices) * 100.0
def klue_re_auprc(probs, labels):
"""KLUE-RE AUPRC (with no_relation)"""
labels = np.eye(30)[labels]
score = np.zeros((30,))
for c in range(30):
targets_c = labels.take([c], axis=1).ravel()
preds_c = probs.take([c], axis=1).ravel()
precision, recall, _ = sklearn.metrics.precision_recall_curve(targets_c, preds_c)
score[c] = sklearn.metrics.auc(recall, precision)
return np.average(score) * 100.0
def compute_metrics(pred):
"""validation을 위한 metrics function"""
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
probs = pred.predictions
# calculate accuracy using sklearn's function
f1 = klue_re_micro_f1(preds, labels)
auprc = klue_re_auprc(probs, labels)
acc = accuracy_score(labels, preds) # 리더보드 평가에는 포함되지 않습니다.
return {
"micro f1 score": f1,
"auprc": auprc,
"accuracy": acc,
}
def n_compute_metrics(logits, y):
"""refactoring된 코드를 위한 compute_metrics"""
logits = logits.detach().cpu()
y = y.detach().cpu()
pred = np.argmax(logits.numpy(), axis=-1)
# calculate accuracy using sklearn's function
f1 = klue_re_micro_f1(pred, y)
acc = accuracy_score(y, pred) # 리더보드 평가에는 포함되지 않습니다.
return {
"micro f1 score": f1,
"accuracy": acc,
}
def load_data(dataset_dir):
"""csv 파일을 경로에 맡게 불러 옵니다."""
pd_dataset = pd.read_csv(dataset_dir)
dataset = preprocessing_dataset(pd_dataset)
return dataset
def num_to_label(label):
"""
숫자로 되어 있던 class를 원본 문자열 라벨로 변환 합니다.
"""
origin_label = []
with open("/opt/ml/code/dict_num_to_label.pkl", "rb") as f:
dict_num_to_label = pickle.load(f)
for v in label:
origin_label.append(dict_num_to_label[v])
return origin_label
def make_output(logits):
"""
batch 단위의 logits을 풀고 prob와 pred를 통해 csv파일을 만듭니다.
"""
logits = torch.cat([x for x in logits])
prob = F.softmax(logits, dim=-1).tolist()
pred = np.argmax(logits, axis=-1).tolist()
pred_a = num_to_label(pred)
output = pd.DataFrame({"id": 0, "pred_label": pred_a, "probs": prob})
output["id"] = range(0, len(output))
output.to_csv("./submission.csv", index=False)
def show_result(result):
f1 = 0
au = 0
for i, x in enumerate(result):
f1 += x["test_f1"]
au += x["test_auprc"]
print("----------------------")
print(f"{i+1}번 Fold")
print(f"F1 score : {x['test_f1']:.2f}")
print(f"AUPRC score : {x['test_auprc']:.2f}")
print("----------------------")
print(f"Average F1 score : {f1/5:.2f}")
print(f"Average AUPRC score : {au/5:.2f}")
print("----------------------")
# loss funcion
# https://discuss.pytorch.org/t/is-this-a-correct-implementation-for-focal-loss-in-pytorch/43327/8
class FocalLoss(nn.Module):
def __init__(self, weight=None, gamma=0.5, reduction="mean"):
nn.Module.__init__(self)
self.weight = weight
self.gamma = gamma
self.reduction = reduction
def forward(self, input_tensor, target_tensor):
log_prob = F.log_softmax(input_tensor, dim=-1)
prob = torch.exp(log_prob)
return F.nll_loss(
((1 - prob) ** self.gamma) * log_prob, target_tensor, weight=self.weight, reduction=self.reduction
)
# class FocalLoss(nn.Module):
# def __init__(self, alpha=1, gamma=2):
# super(FocalLoss, self).__init__()
# self.alpha = alpha
# self.gamma = gamma
# def forward(self, outputs, targets):
# ce_loss = torch.nn.functional.cross_entropy(outputs, targets, reduction="none")
# pt = torch.exp(-ce_loss)
# focal_loss = (self.alpha * (1 - pt) ** self.gamma * ce_loss).mean()
# return focal_loss
class LabelSmoothingLoss(nn.Module):
def __init__(self, classes=3, smoothing=0.0, dim=-1):
super(LabelSmoothingLoss, self).__init__()
self.confidence = 1.0 - smoothing
self.smoothing = smoothing
self.cls = classes
self.dim = dim
def forward(self, pred, target):
pred = pred.log_softmax(dim=self.dim)
with torch.no_grad():
true_dist = torch.zeros_like(pred)
true_dist.fill_(self.smoothing / (self.cls - 1))
true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
return torch.mean(torch.sum(-true_dist * pred, dim=self.dim))
# https://gist.github.com/SuperShinyEyes/dcc68a08ff8b615442e3bc6a9b55a354
class F1Loss(nn.Module):
def __init__(self, classes=30, epsilon=1e-7):
super().__init__()
self.classes = classes
self.epsilon = epsilon
def forward(self, y_pred, y_true):
assert y_pred.ndim == 2
assert y_true.ndim == 1
y_true = F.one_hot(y_true, self.classes).to(torch.float32)
y_pred = F.softmax(y_pred, dim=1)
tp = (y_true * y_pred).sum(dim=0).to(torch.float32)
tn = ((1 - y_true) * (1 - y_pred)).sum(dim=0).to(torch.float32)
fp = ((1 - y_true) * y_pred).sum(dim=0).to(torch.float32)
fn = (y_true * (1 - y_pred)).sum(dim=0).to(torch.float32)
precision = tp / (tp + fp + self.epsilon)
recall = tp / (tp + fn + self.epsilon)
f1 = 2 * (precision * recall) / (precision + recall + self.epsilon)
f1 = f1.clamp(min=self.epsilon, max=1 - self.epsilon)
return 1 - f1.mean()
_criterion_entrypoints = {
"CrossEntropy": nn.CrossEntropyLoss(),
"focal": FocalLoss(),
"label_smoothing": LabelSmoothingLoss(),
"f1": F1Loss(),
}
def criterion_entrypoint(criterion_name):
return _criterion_entrypoints[criterion_name]
Tuning
Tune.py
"""
batch_size
"""
trainer = pl.Trainer(auto_scale_batch_size="binsearch")
trainer.tuner.scale_batch_size(model=model, datamodule=dataloader)
"""
learning_rate
"""
trainer = pl.Trainer(auto_lr_find=True)
trainer.tuner.lr_find(model=model,datamodule=dataloader, num_training=300)