[YOLO] #4. YOLOv8 detection 코드 공부하기

임소현·2023년 6월 27일
0

YOLO

목록 보기
4/5

https://docs.ultralytics.com/tasks/detect/

다시 새롭게 시작하는 마음으로 공부하고자 한다..

yolov8.yaml

먼저, 위 파일에는 yolov8의 구조에 관한 설명이 나와있다.

backbone과 head로 구분이 되며, backbone에는 convolution layer와 c2f, sppf가 있으며, head에도 역시 c2f, convolution layer, 마지막 layer인 detect layer가 존재한다.

module > block.py

module > conv.py

module > head.py

(다음 포스팅에서 다룰 내용)

task.py

class DetectionModel(BaseModel):
    """YOLOv8 detection model."""

    def __init__(self, cfg='yolov8n.yaml', ch=3, nc=None, verbose=True):  # model, input channels, number of classes, weight는 yolov8n으로 default값
        super().__init__()
        self.yaml = cfg if isinstance(cfg, dict) else yaml_model_load(cfg)  # cfg dict -> 딕셔너리

        # Define model
        ch = self.yaml['ch'] = self.yaml.get('ch', ch)  # input channels #채널 값 얻어서 저장
        if nc and nc != self.yaml['nc']: #nc : number classes : 클래스 개수 저장
            LOGGER.info(f"Overriding model.yaml nc={self.yaml['nc']} with nc={nc}")
            self.yaml['nc'] = nc  # override yaml value
        self.model, self.save = parse_model(deepcopy(self.yaml), ch=ch, verbose=verbose)  # model, savelist
        self.names = {i: f'{i}' for i in range(self.yaml['nc'])}  # default names dict, names라는 딕셔너리에 각 클래스에 대한 라벨명 저장
        self.inplace = self.yaml.get('inplace', True)

        # Build strides
        m = self.model[-1]  # Detect() ? head의 detect 의미, yolo의 마지막 layer
        if isinstance(m, (Detect, Segment, Pose)): #m이 세가지 중 하나에 속하는 경우
            s = 256  # 2x min stride
            m.inplace = self.inplace
            forward = lambda x: self.forward(x)[0] if isinstance(m, (Segment, Pose)) else self.forward(x) #m이 segment, pose 중 하나에 속하는 경우
            m.stride = torch.tensor([s / x.shape[-2] for x in forward(torch.zeros(1, ch, s, s))])  # forward -> 채널 수 x grid x grid
            self.stride = m.stride
            m.bias_init()  # only run once
        else: #m이 classifier일 경우
            self.stride = torch.Tensor([32])  # default stride for i.e. RTDETR

        # Init weights, biases
        initialize_weights(self) #weight 값 초기화
        if verbose:
            self.info()
            LOGGER.info('')

    def _predict_augment(self, x):
        """Perform augmentations on input image x and return augmented inference and train outputs."""
        img_size = x.shape[-2:]  # height, width
        s = [1, 0.83, 0.67]  # scales
        f = [None, 3, None]  # flips (2-ud, 3-lr)
        y = []  # outputs
        for si, fi in zip(s, f):
            xi = scale_img(x.flip(fi) if fi else x, si, gs=int(self.stride.max()))
            yi = super().predict(xi)[0]  # forward
            # cv2.imwrite(f'img_{si}.jpg', 255 * xi[0].cpu().numpy().transpose((1, 2, 0))[:, :, ::-1])  # save
            yi = self._descale_pred(yi, fi, si, img_size) #descale pred
            y.append(yi)
        y = self._clip_augmented(y)  # clip augmented tails
        return torch.cat(y, -1), None  # augmented inference, train

    @staticmethod
    def _descale_pred(p, flips, scale, img_size, dim=1):
        """De-scale predictions following augmented inference (inverse operation)."""
        p[:, :4] /= scale  # de-scale
        x, y, wh, cls = p.split((1, 1, 2, p.shape[dim] - 4), dim)
        if flips == 2:
            y = img_size[0] - y  # de-flip ud
        elif flips == 3:
            x = img_size[1] - x  # de-flip lr
        return torch.cat((x, y, wh, cls), dim)

    def _clip_augmented(self, y):
        """Clip YOLOv5 augmented inference tails."""
        nl = self.model[-1].nl  # number of detection layers (P3-P5)
        g = sum(4 ** x for x in range(nl))  # grid points
        e = 1  # exclude layer count
        i = (y[0].shape[-1] // g) * sum(4 ** x for x in range(e))  # indices
        y[0] = y[0][..., :-i]  # large
        i = (y[-1].shape[-1] // g) * sum(4 ** (nl - 1 - x) for x in range(e))  # indices
        y[-1] = y[-1][..., i:]  # small
        return y

    def init_criterion(self): #손실 함수 초기화
        return v8DetectionLoss(self)

engine > trainer.py

task가 detect인 경우, detect에 대한 detectionModel이 model.py 파일에서 생성된다. 그 후 detectionModel에 대한 detectionTrainer를 추가적으로 만들어 task detect에 대한 모델 훈련을 진행하게 된다. 이때, 각 task에 대한 trainer을 생성할 때, 4가지의 task trainer에 대한 baseTrainer가 부모 클래스로 상속하게 된다. 이때 baseTrainer가 trainer.py 파일에 저장되어 있다. 따라서, baseTrainer에 관한 코드를 이해해야만 train 과정을 이해할 수 있다.

 def __init__(self, cfg=DEFAULT_CFG, overrides=None, _callbacks=None): #weight : cfg
        """
        Initializes the BaseTrainer class.

        Args:
            cfg (str, optional): Path to a configuration file. Defaults to DEFAULT_CFG. #weight file
            overrides (dict, optional): Configuration overrides. Defaults to None.
        """
        self.args = get_cfg(cfg, overrides) #overrides에서 cfg weight 얻기
        self.device = select_device(self.args.device, self.args.batch)
        self.check_resume()
        self.validator = None
        self.model = None
        self.metrics = None
        self.plots = {}
        init_seeds(self.args.seed + 1 + RANK, deterministic=self.args.deterministic)

        # Dirs
        project = self.args.project or Path(SETTINGS['runs_dir']) / self.args.task
        name = self.args.name or f'{self.args.mode}'
        if hasattr(self.args, 'save_dir'):
            self.save_dir = Path(self.args.save_dir)
        else:
            self.save_dir = Path(
                increment_path(Path(project) / name, exist_ok=self.args.exist_ok if RANK in (-1, 0) else True))
        self.wdir = self.save_dir / 'weights'  # weights dir
        if RANK in (-1, 0):
            self.wdir.mkdir(parents=True, exist_ok=True)  # make dir
            self.args.save_dir = str(self.save_dir)
            yaml_save(self.save_dir / 'args.yaml', vars(self.args))  # save run args
        self.last, self.best = self.wdir / 'last.pt', self.wdir / 'best.pt'  # checkpoint paths
        self.save_period = self.args.save_period

        self.batch_size = self.args.batch #args에 저장된 batch 사이즈
        self.epochs = self.args.epochs #args에 저장된 epochs 수
        self.start_epoch = 0 #start epoch 0으로 설정
        if RANK == -1:
            print_args(vars(self.args))

        # Device
        if self.device.type == 'cpu':
            self.args.workers = 0  # faster CPU training as time dominated by inference, not dataloading

        # Model and Dataset
        self.model = self.args.model #모델 저장
        try:
            if self.args.task == 'classify':
                self.data = check_cls_dataset(self.args.data)
            elif self.args.data.endswith('.yaml') or self.args.task in ('detect', 'segment'):
                self.data = check_det_dataset(self.args.data) #detect dataset인지 체크 후 데이터에 저장
                if 'yaml_file' in self.data: #데이터셋에 yaml file이 있는 경우
                    self.args.data = self.data['yaml_file']  # for validating 'yolo train data=url.zip' usage
        except Exception as e:
            raise RuntimeError(emojis(f"Dataset '{clean_url(self.args.data)}' error ❌ {e}")) from e

        self.trainset, self.testset = self.get_dataset(self.data) #데이터셋을 trainset과 testset으로 나누기
        self.ema = None

        # Optimization utils init
        self.lf = None
        self.scheduler = None

        # Epoch level metrics, 변수값 None으로 초기화
        self.best_fitness = None
        self.fitness = None
        self.loss = None #instant loss
        self.tloss = None #total loss
        self.loss_names = ['Loss']
        self.csv = self.save_dir / 'results.csv'
        self.plot_idx = [0, 1, 2]

        # Callbacks
        self.callbacks = _callbacks or callbacks.get_default_callbacks()
        if RANK in (-1, 0):
            callbacks.add_integration_callbacks(self)

위는 baseTrainer의 init 함수 부분이다. 따로 훈련을 진행하는 부분은 없으며, 훈련을 진행하는 동안 사용하게 되는 변수 초기화 및 cpu 혹은 gpu의 환경을 설정하는 부분이 포함되어 있다. 또한, task에 따라 올바른 데이터셋이 할당되어 있는 지를 체크하며, trainset과 testset으로 구분하여 기본 세팅을 한다.

train -> _do_train (device 장치가 정상적으로 작동 하는 경우) :
world size > 2 : _set_up_ddp
world size <= 2 : _set_up_train : 배치 사이즈 점검, 옵티마이저 세팅, 모델 형성

self.model = self.args.model (args에 저장된 모델 정보) , args는 cfg에 저장된 정보 : args는 overrides와 default args 더하기

#model.py
self.trainer.model = self.trainer.get_model(weights=self.model if self.ckpt else None, cfg=self.model.yaml)
self.model = self.trainer.model #모델은 trainer의 model에 해당됨
self.trainer.hub_session = self.session  # attach optional HUB session
self.trainer.train()

trainer의 모델은 trainer에서 get_model 함수를 이용하여 정의할 수 있다.
trainer를 train을 시킨다. get_model 함수는 detect > train.py에서 정의되어 있다.

    def get_model(self, cfg=None, weights=None, verbose=True):
        """Return a YOLO detection model."""
        model = DetectionModel(cfg, nc=self.data['nc'], verbose=verbose and RANK == -1)
        if weights:
            model.load(weights)
        return model

모델은 yolo detection model을 생성한다.

  • 모델 생성 코드 공부

다시 이어서 _do_train 함수의 훈련 과정을 설명해보려 한다.
이 함수에서는 본격적으로 epoch 횟수에 따른 훈련을 진행하게 된다. 각 epoch 에 따른 시간 또한 재며, forward 과정과 backward 과정을 진행한 후, optimizer을 실행한다.

for epoch in range(self.start_epoch, self.epochs):
            self.epoch = epoch #self.epoch 임시 저장 
            self.run_callbacks('on_train_epoch_start')
            #모델 훈련 진행
            self.model.train() #model이 뭐인가?? -> get_model로 정의를 했음 -> detectionModel 정의
            if RANK != -1:
                self.train_loader.sampler.set_epoch(epoch)
            pbar = enumerate(self.train_loader)
            # Update dataloader attributes (optional)
            if epoch == (self.epochs - self.args.close_mosaic):
                LOGGER.info('Closing dataloader mosaic')
                if hasattr(self.train_loader.dataset, 'mosaic'):
                    self.train_loader.dataset.mosaic = False
                if hasattr(self.train_loader.dataset, 'close_mosaic'):
                    self.train_loader.dataset.close_mosaic(hyp=self.args)
                self.train_loader.reset()

            if RANK in (-1, 0):
                LOGGER.info(self.progress_string())
                pbar = tqdm(enumerate(self.train_loader), total=nb, bar_format=TQDM_BAR_FORMAT)
            self.tloss = None
            self.optimizer.zero_grad()
            for i, batch in pbar: #batch 개수만큼 train 시작
                self.run_callbacks('on_train_batch_start')
                # Warmup
                ni = i + nb * epoch #nb : number batch * epoch 수 + i
                if ni <= nw:
                    xi = [0, nw]  # x interp
                    self.accumulate = max(1, np.interp(ni, xi, [1, self.args.nbs / self.batch_size]).round())
                    for j, x in enumerate(self.optimizer.param_groups):
                        # Bias lr falls from 0.1 to lr0, all other lrs rise from 0.0 to lr0
                        x['lr'] = np.interp(
                            ni, xi, [self.args.warmup_bias_lr if j == 0 else 0.0, x['initial_lr'] * self.lf(epoch)])
                        if 'momentum' in x:
                            x['momentum'] = np.interp(ni, xi, [self.args.warmup_momentum, self.args.momentum])

                # Forward
                with torch.cuda.amp.autocast(self.amp):
                    batch = self.preprocess_batch(batch)
                    self.loss, self.loss_items = self.model(batch)
                    if RANK != -1:
                        self.loss *= world_size
                    self.tloss = (self.tloss * i + self.loss_items) / (i + 1) if self.tloss is not None \
                        else self.loss_items

                # Backward
                self.scaler.scale(self.loss).backward() #backward 함수 실행

                # Optimize - https://pytorch.org/docs/master/notes/amp_examples.html
                if ni - last_opt_step >= self.accumulate:
                    self.optimizer_step() #옵티마이저 실행
                    last_opt_step = ni

                # Log
                mem = f'{torch.cuda.memory_reserved() / 1E9 if torch.cuda.is_available() else 0:.3g}G'  # (GB)
                loss_len = self.tloss.shape[0] if len(self.tloss.size()) else 1
                losses = self.tloss if loss_len > 1 else torch.unsqueeze(self.tloss, 0)
                if RANK in (-1, 0):
                    pbar.set_description(
                        ('%11s' * 2 + '%11.4g' * (2 + loss_len)) %
                        (f'{epoch + 1}/{self.epochs}', mem, *losses, batch['cls'].shape[0], batch['img'].shape[-1])) #loss, batch 사이즈
                    self.run_callbacks('on_batch_end')
                    if self.args.plots and ni in self.plot_idx:
                        self.plot_training_samples(batch, ni)

                self.run_callbacks('on_train_batch_end')

            self.lr = {f'lr/pg{ir}': x['lr'] for ir, x in enumerate(self.optimizer.param_groups)}  # for loggers

            self.scheduler.step()
            self.run_callbacks('on_train_epoch_end')

            if RANK in (-1, 0):

                # Validation
                self.ema.update_attr(self.model, include=['yaml', 'nc', 'args', 'names', 'stride', 'class_weights'])
                final_epoch = (epoch + 1 == self.epochs) or self.stopper.possible_stop

                if self.args.val or final_epoch:
                    self.metrics, self.fitness = self.validate()
                self.save_metrics(metrics={**self.label_loss_items(self.tloss), **self.metrics, **self.lr})
                self.stop = self.stopper(epoch + 1, self.fitness)

                # Save model
                if self.args.save or (epoch + 1 == self.epochs):
                    self.save_model()
                    self.run_callbacks('on_model_save')

            tnow = time.time()
            self.epoch_time = tnow - self.epoch_time_start
            self.epoch_time_start = tnow
            self.run_callbacks('on_fit_epoch_end')
            torch.cuda.empty_cache()  # clears GPU vRAM at end of epoch, can help with out of memory errors

            # Early Stopping
            if RANK != -1:  # if DDP training
                broadcast_list = [self.stop if RANK == 0 else None]
                dist.broadcast_object_list(broadcast_list, 0)  # broadcast 'stop' to all ranks
                if RANK != 0:
                    self.stop = broadcast_list[0]
            if self.stop:
                break  # must break all DDP ranks

하지만, 이 코드에는 대략적인 epoch 훈련 과정만이 나타나있다. forward -> backward -> optimizer -> early stopping 등 한 epoch 과정에서 훈련이 어떠한 식으로 되어있는지만 나타나 있고, 구체적으로 데이터를 이용하여 훈련이 되는지는 나와 있지 않다. 아마도 저 close_mosaic과 관련이 있는 것으로 보인다. train_loader.sampler.set_epoch(epoch) 이 함수를 더 살펴봐야 될 것 같고, 다른 함수들 또한 코드들을 살펴봐야 될 것 같다.

중요한 부분은 각 epoch마다 훈련을 진행하는 부분이다.
self.model.train() 그렇다면 detectionModel의 train코드를 먼저 살펴봐야 겠다.

근데 train 코드가 없다.. 예상하기로는 저 detectionModel의 부모 클래스인 baseModel이 nn.Module을 상속받는다. 그리고 저 detectionModel은 다시 detect라는 클래스 객체를 생성하고, 이 객체 내에서 layer을 쌓는 것 같다.

class Detect(nn.Module):
    """YOLOv8 Detect head for detection models."""
    dynamic = False  # force grid reconstruction
    export = False  # export mode
    shape = None
    anchors = torch.empty(0)  # init
    strides = torch.empty(0)  # init

    def __init__(self, nc=80, ch=()):  # detection layer
        super().__init__()
        self.nc = nc  # number of classes
        self.nl = len(ch)  # number of detection layers
        self.reg_max = 16  # DFL channels (ch[0] // 16 to scale 4/8/12/16/20 for n/s/m/l/x)
        self.no = nc + self.reg_max * 4  # number of outputs per anchor
        self.stride = torch.zeros(self.nl)  # strides computed during build
        c2, c3 = max((16, ch[0] // 4, self.reg_max * 4)), max(ch[0], self.nc)  # channels
        self.cv2 = nn.ModuleList(
            nn.Sequential(Conv(x, c2, 3), Conv(c2, c2, 3), nn.Conv2d(c2, 4 * self.reg_max, 1)) for x in ch)
        self.cv3 = nn.ModuleList(nn.Sequential(Conv(x, c3, 3), Conv(c3, c3, 3), nn.Conv2d(c3, self.nc, 1)) for x in ch)
        self.dfl = DFL(self.reg_max) if self.reg_max > 1 else nn.Identity()
        
        def bias_init(self):
        """Initialize Detect() biases, WARNING: requires stride availability."""
        m = self  # self.model[-1]  # Detect() module
        # cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1
        # ncf = math.log(0.6 / (m.nc - 0.999999)) if cf is None else torch.log(cf / cf.sum())  # nominal class frequency
        for a, b, s in zip(m.cv2, m.cv3, m.stride):  # from
            a[-1].bias.data[:] = 1.0  # box
            b[-1].bias.data[:m.nc] = math.log(5 / m.nc / (640 / s) ** 2)  # cls (.01 objects, 80 classes, 640 img)

bias_init 함수를 통해 detection class 객체가 초기화되어 생성되는 것으로 보여진다. detection Model에서 이 함수를 통해 class detect로 접근한다.

흠.. architecture를 비교해가며 다시 한번 공부해봐야 될 것 것 같다. 그리고 train 이후에 어떻게 클래스 확률 정보와 bounding box 정보 또한 저장되는 경로도 다시 공부해야 될 것 같다.

0개의 댓글