다시 새롭게 시작하는 마음으로 공부하고자 한다..
yolov8.yaml
먼저, 위 파일에는 yolov8의 구조에 관한 설명이 나와있다.
backbone과 head로 구분이 되며, backbone에는 convolution layer와 c2f, sppf가 있으며, head에도 역시 c2f, convolution layer, 마지막 layer인 detect layer가 존재한다.
module > block.py
module > conv.py
module > head.py
(다음 포스팅에서 다룰 내용)
task.py
class DetectionModel(BaseModel):
"""YOLOv8 detection model."""
def __init__(self, cfg='yolov8n.yaml', ch=3, nc=None, verbose=True): # model, input channels, number of classes, weight는 yolov8n으로 default값
super().__init__()
self.yaml = cfg if isinstance(cfg, dict) else yaml_model_load(cfg) # cfg dict -> 딕셔너리
# Define model
ch = self.yaml['ch'] = self.yaml.get('ch', ch) # input channels #채널 값 얻어서 저장
if nc and nc != self.yaml['nc']: #nc : number classes : 클래스 개수 저장
LOGGER.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}")
self.yaml['nc'] = nc # override yaml value
self.model, self.save = parse_model(deepcopy(self.yaml), ch=ch, verbose=verbose) # model, savelist
self.names = {i: f'{i}' for i in range(self.yaml['nc'])} # default names dict, names라는 딕셔너리에 각 클래스에 대한 라벨명 저장
self.inplace = self.yaml.get('inplace', True)
# Build strides
m = self.model[-1] # Detect() ? head의 detect 의미, yolo의 마지막 layer
if isinstance(m, (Detect, Segment, Pose)): #m이 세가지 중 하나에 속하는 경우
s = 256 # 2x min stride
m.inplace = self.inplace
forward = lambda x: self.forward(x)[0] if isinstance(m, (Segment, Pose)) else self.forward(x) #m이 segment, pose 중 하나에 속하는 경우
m.stride = torch.tensor([s / x.shape[-2] for x in forward(torch.zeros(1, ch, s, s))]) # forward -> 채널 수 x grid x grid
self.stride = m.stride
m.bias_init() # only run once
else: #m이 classifier일 경우
self.stride = torch.Tensor([32]) # default stride for i.e. RTDETR
# Init weights, biases
initialize_weights(self) #weight 값 초기화
if verbose:
self.info()
LOGGER.info('')
def _predict_augment(self, x):
"""Perform augmentations on input image x and return augmented inference and train outputs."""
img_size = x.shape[-2:] # height, width
s = [1, 0.83, 0.67] # scales
f = [None, 3, None] # flips (2-ud, 3-lr)
y = [] # outputs
for si, fi in zip(s, f):
xi = scale_img(x.flip(fi) if fi else x, si, gs=int(self.stride.max()))
yi = super().predict(xi)[0] # forward
# cv2.imwrite(f'img_{si}.jpg', 255 * xi[0].cpu().numpy().transpose((1, 2, 0))[:, :, ::-1]) # save
yi = self._descale_pred(yi, fi, si, img_size) #descale pred
y.append(yi)
y = self._clip_augmented(y) # clip augmented tails
return torch.cat(y, -1), None # augmented inference, train
@staticmethod
def _descale_pred(p, flips, scale, img_size, dim=1):
"""De-scale predictions following augmented inference (inverse operation)."""
p[:, :4] /= scale # de-scale
x, y, wh, cls = p.split((1, 1, 2, p.shape[dim] - 4), dim)
if flips == 2:
y = img_size[0] - y # de-flip ud
elif flips == 3:
x = img_size[1] - x # de-flip lr
return torch.cat((x, y, wh, cls), dim)
def _clip_augmented(self, y):
"""Clip YOLOv5 augmented inference tails."""
nl = self.model[-1].nl # number of detection layers (P3-P5)
g = sum(4 ** x for x in range(nl)) # grid points
e = 1 # exclude layer count
i = (y[0].shape[-1] // g) * sum(4 ** x for x in range(e)) # indices
y[0] = y[0][..., :-i] # large
i = (y[-1].shape[-1] // g) * sum(4 ** (nl - 1 - x) for x in range(e)) # indices
y[-1] = y[-1][..., i:] # small
return y
def init_criterion(self): #손실 함수 초기화
return v8DetectionLoss(self)
engine > trainer.py
task가 detect인 경우, detect에 대한 detectionModel이 model.py 파일에서 생성된다. 그 후 detectionModel에 대한 detectionTrainer를 추가적으로 만들어 task detect에 대한 모델 훈련을 진행하게 된다. 이때, 각 task에 대한 trainer을 생성할 때, 4가지의 task trainer에 대한 baseTrainer가 부모 클래스로 상속하게 된다. 이때 baseTrainer가 trainer.py 파일에 저장되어 있다. 따라서, baseTrainer에 관한 코드를 이해해야만 train 과정을 이해할 수 있다.
def __init__(self, cfg=DEFAULT_CFG, overrides=None, _callbacks=None): #weight : cfg
"""
Initializes the BaseTrainer class.
Args:
cfg (str, optional): Path to a configuration file. Defaults to DEFAULT_CFG. #weight file
overrides (dict, optional): Configuration overrides. Defaults to None.
"""
self.args = get_cfg(cfg, overrides) #overrides에서 cfg weight 얻기
self.device = select_device(self.args.device, self.args.batch)
self.check_resume()
self.validator = None
self.model = None
self.metrics = None
self.plots = {}
init_seeds(self.args.seed + 1 + RANK, deterministic=self.args.deterministic)
# Dirs
project = self.args.project or Path(SETTINGS['runs_dir']) / self.args.task
name = self.args.name or f'{self.args.mode}'
if hasattr(self.args, 'save_dir'):
self.save_dir = Path(self.args.save_dir)
else:
self.save_dir = Path(
increment_path(Path(project) / name, exist_ok=self.args.exist_ok if RANK in (-1, 0) else True))
self.wdir = self.save_dir / 'weights' # weights dir
if RANK in (-1, 0):
self.wdir.mkdir(parents=True, exist_ok=True) # make dir
self.args.save_dir = str(self.save_dir)
yaml_save(self.save_dir / 'args.yaml', vars(self.args)) # save run args
self.last, self.best = self.wdir / 'last.pt', self.wdir / 'best.pt' # checkpoint paths
self.save_period = self.args.save_period
self.batch_size = self.args.batch #args에 저장된 batch 사이즈
self.epochs = self.args.epochs #args에 저장된 epochs 수
self.start_epoch = 0 #start epoch 0으로 설정
if RANK == -1:
print_args(vars(self.args))
# Device
if self.device.type == 'cpu':
self.args.workers = 0 # faster CPU training as time dominated by inference, not dataloading
# Model and Dataset
self.model = self.args.model #모델 저장
try:
if self.args.task == 'classify':
self.data = check_cls_dataset(self.args.data)
elif self.args.data.endswith('.yaml') or self.args.task in ('detect', 'segment'):
self.data = check_det_dataset(self.args.data) #detect dataset인지 체크 후 데이터에 저장
if 'yaml_file' in self.data: #데이터셋에 yaml file이 있는 경우
self.args.data = self.data['yaml_file'] # for validating 'yolo train data=url.zip' usage
except Exception as e:
raise RuntimeError(emojis(f"Dataset '{clean_url(self.args.data)}' error ❌ {e}")) from e
self.trainset, self.testset = self.get_dataset(self.data) #데이터셋을 trainset과 testset으로 나누기
self.ema = None
# Optimization utils init
self.lf = None
self.scheduler = None
# Epoch level metrics, 변수값 None으로 초기화
self.best_fitness = None
self.fitness = None
self.loss = None #instant loss
self.tloss = None #total loss
self.loss_names = ['Loss']
self.csv = self.save_dir / 'results.csv'
self.plot_idx = [0, 1, 2]
# Callbacks
self.callbacks = _callbacks or callbacks.get_default_callbacks()
if RANK in (-1, 0):
callbacks.add_integration_callbacks(self)
위는 baseTrainer의 init 함수 부분이다. 따로 훈련을 진행하는 부분은 없으며, 훈련을 진행하는 동안 사용하게 되는 변수 초기화 및 cpu 혹은 gpu의 환경을 설정하는 부분이 포함되어 있다. 또한, task에 따라 올바른 데이터셋이 할당되어 있는 지를 체크하며, trainset과 testset으로 구분하여 기본 세팅을 한다.
train -> _do_train (device 장치가 정상적으로 작동 하는 경우) :
world size > 2 : _set_up_ddp
world size <= 2 : _set_up_train : 배치 사이즈 점검, 옵티마이저 세팅, 모델 형성
self.model = self.args.model (args에 저장된 모델 정보) , args는 cfg에 저장된 정보 : args는 overrides와 default args 더하기
#model.py
self.trainer.model = self.trainer.get_model(weights=self.model if self.ckpt else None, cfg=self.model.yaml)
self.model = self.trainer.model #모델은 trainer의 model에 해당됨
self.trainer.hub_session = self.session # attach optional HUB session
self.trainer.train()
trainer의 모델은 trainer에서 get_model 함수를 이용하여 정의할 수 있다.
trainer를 train을 시킨다. get_model 함수는 detect > train.py에서 정의되어 있다.
def get_model(self, cfg=None, weights=None, verbose=True):
"""Return a YOLO detection model."""
model = DetectionModel(cfg, nc=self.data['nc'], verbose=verbose and RANK == -1)
if weights:
model.load(weights)
return model
모델은 yolo detection model을 생성한다.
다시 이어서 _do_train 함수의 훈련 과정을 설명해보려 한다.
이 함수에서는 본격적으로 epoch 횟수에 따른 훈련을 진행하게 된다. 각 epoch 에 따른 시간 또한 재며, forward 과정과 backward 과정을 진행한 후, optimizer을 실행한다.
for epoch in range(self.start_epoch, self.epochs):
self.epoch = epoch #self.epoch 임시 저장
self.run_callbacks('on_train_epoch_start')
#모델 훈련 진행
self.model.train() #model이 뭐인가?? -> get_model로 정의를 했음 -> detectionModel 정의
if RANK != -1:
self.train_loader.sampler.set_epoch(epoch)
pbar = enumerate(self.train_loader)
# Update dataloader attributes (optional)
if epoch == (self.epochs - self.args.close_mosaic):
LOGGER.info('Closing dataloader mosaic')
if hasattr(self.train_loader.dataset, 'mosaic'):
self.train_loader.dataset.mosaic = False
if hasattr(self.train_loader.dataset, 'close_mosaic'):
self.train_loader.dataset.close_mosaic(hyp=self.args)
self.train_loader.reset()
if RANK in (-1, 0):
LOGGER.info(self.progress_string())
pbar = tqdm(enumerate(self.train_loader), total=nb, bar_format=TQDM_BAR_FORMAT)
self.tloss = None
self.optimizer.zero_grad()
for i, batch in pbar: #batch 개수만큼 train 시작
self.run_callbacks('on_train_batch_start')
# Warmup
ni = i + nb * epoch #nb : number batch * epoch 수 + i
if ni <= nw:
xi = [0, nw] # x interp
self.accumulate = max(1, np.interp(ni, xi, [1, self.args.nbs / self.batch_size]).round())
for j, x in enumerate(self.optimizer.param_groups):
# Bias lr falls from 0.1 to lr0, all other lrs rise from 0.0 to lr0
x['lr'] = np.interp(
ni, xi, [self.args.warmup_bias_lr if j == 0 else 0.0, x['initial_lr'] * self.lf(epoch)])
if 'momentum' in x:
x['momentum'] = np.interp(ni, xi, [self.args.warmup_momentum, self.args.momentum])
# Forward
with torch.cuda.amp.autocast(self.amp):
batch = self.preprocess_batch(batch)
self.loss, self.loss_items = self.model(batch)
if RANK != -1:
self.loss *= world_size
self.tloss = (self.tloss * i + self.loss_items) / (i + 1) if self.tloss is not None \
else self.loss_items
# Backward
self.scaler.scale(self.loss).backward() #backward 함수 실행
# Optimize - https://pytorch.org/docs/master/notes/amp_examples.html
if ni - last_opt_step >= self.accumulate:
self.optimizer_step() #옵티마이저 실행
last_opt_step = ni
# Log
mem = f'{torch.cuda.memory_reserved() / 1E9 if torch.cuda.is_available() else 0:.3g}G' # (GB)
loss_len = self.tloss.shape[0] if len(self.tloss.size()) else 1
losses = self.tloss if loss_len > 1 else torch.unsqueeze(self.tloss, 0)
if RANK in (-1, 0):
pbar.set_description(
('%11s' * 2 + '%11.4g' * (2 + loss_len)) %
(f'{epoch + 1}/{self.epochs}', mem, *losses, batch['cls'].shape[0], batch['img'].shape[-1])) #loss, batch 사이즈
self.run_callbacks('on_batch_end')
if self.args.plots and ni in self.plot_idx:
self.plot_training_samples(batch, ni)
self.run_callbacks('on_train_batch_end')
self.lr = {f'lr/pg{ir}': x['lr'] for ir, x in enumerate(self.optimizer.param_groups)} # for loggers
self.scheduler.step()
self.run_callbacks('on_train_epoch_end')
if RANK in (-1, 0):
# Validation
self.ema.update_attr(self.model, include=['yaml', 'nc', 'args', 'names', 'stride', 'class_weights'])
final_epoch = (epoch + 1 == self.epochs) or self.stopper.possible_stop
if self.args.val or final_epoch:
self.metrics, self.fitness = self.validate()
self.save_metrics(metrics={**self.label_loss_items(self.tloss), **self.metrics, **self.lr})
self.stop = self.stopper(epoch + 1, self.fitness)
# Save model
if self.args.save or (epoch + 1 == self.epochs):
self.save_model()
self.run_callbacks('on_model_save')
tnow = time.time()
self.epoch_time = tnow - self.epoch_time_start
self.epoch_time_start = tnow
self.run_callbacks('on_fit_epoch_end')
torch.cuda.empty_cache() # clears GPU vRAM at end of epoch, can help with out of memory errors
# Early Stopping
if RANK != -1: # if DDP training
broadcast_list = [self.stop if RANK == 0 else None]
dist.broadcast_object_list(broadcast_list, 0) # broadcast 'stop' to all ranks
if RANK != 0:
self.stop = broadcast_list[0]
if self.stop:
break # must break all DDP ranks
하지만, 이 코드에는 대략적인 epoch 훈련 과정만이 나타나있다. forward -> backward -> optimizer -> early stopping 등 한 epoch 과정에서 훈련이 어떠한 식으로 되어있는지만 나타나 있고, 구체적으로 데이터를 이용하여 훈련이 되는지는 나와 있지 않다. 아마도 저 close_mosaic과 관련이 있는 것으로 보인다. train_loader.sampler.set_epoch(epoch) 이 함수를 더 살펴봐야 될 것 같고, 다른 함수들 또한 코드들을 살펴봐야 될 것 같다.
중요한 부분은 각 epoch마다 훈련을 진행하는 부분이다.
self.model.train() 그렇다면 detectionModel의 train코드를 먼저 살펴봐야 겠다.
근데 train 코드가 없다.. 예상하기로는 저 detectionModel의 부모 클래스인 baseModel이 nn.Module을 상속받는다. 그리고 저 detectionModel은 다시 detect라는 클래스 객체를 생성하고, 이 객체 내에서 layer을 쌓는 것 같다.
class Detect(nn.Module):
"""YOLOv8 Detect head for detection models."""
dynamic = False # force grid reconstruction
export = False # export mode
shape = None
anchors = torch.empty(0) # init
strides = torch.empty(0) # init
def __init__(self, nc=80, ch=()): # detection layer
super().__init__()
self.nc = nc # number of classes
self.nl = len(ch) # number of detection layers
self.reg_max = 16 # DFL channels (ch[0] // 16 to scale 4/8/12/16/20 for n/s/m/l/x)
self.no = nc + self.reg_max * 4 # number of outputs per anchor
self.stride = torch.zeros(self.nl) # strides computed during build
c2, c3 = max((16, ch[0] // 4, self.reg_max * 4)), max(ch[0], self.nc) # channels
self.cv2 = nn.ModuleList(
nn.Sequential(Conv(x, c2, 3), Conv(c2, c2, 3), nn.Conv2d(c2, 4 * self.reg_max, 1)) for x in ch)
self.cv3 = nn.ModuleList(nn.Sequential(Conv(x, c3, 3), Conv(c3, c3, 3), nn.Conv2d(c3, self.nc, 1)) for x in ch)
self.dfl = DFL(self.reg_max) if self.reg_max > 1 else nn.Identity()
def bias_init(self):
"""Initialize Detect() biases, WARNING: requires stride availability."""
m = self # self.model[-1] # Detect() module
# cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1
# ncf = math.log(0.6 / (m.nc - 0.999999)) if cf is None else torch.log(cf / cf.sum()) # nominal class frequency
for a, b, s in zip(m.cv2, m.cv3, m.stride): # from
a[-1].bias.data[:] = 1.0 # box
b[-1].bias.data[:m.nc] = math.log(5 / m.nc / (640 / s) ** 2) # cls (.01 objects, 80 classes, 640 img)
bias_init 함수를 통해 detection class 객체가 초기화되어 생성되는 것으로 보여진다. detection Model에서 이 함수를 통해 class detect로 접근한다.
흠.. architecture를 비교해가며 다시 한번 공부해봐야 될 것 것 같다. 그리고 train 이후에 어떻게 클래스 확률 정보와 bounding box 정보 또한 저장되는 경로도 다시 공부해야 될 것 같다.