@Scheduled 코드 분석하기

mylime·2024년 6월 20일
0
post-thumbnail

이 포스팅은 2024.06.21에 작성되었습니다.



가장 간단한 코드

분석해볼 아주 간단한 코드를 만들어보겠다. (@Scheduled 사용법이 사실 간단하다)

@EnableScheduling
@SpringBootApplication
public class MiniPayApplication {
    public static void main(String[] args) {
        SpringApplication.run(MiniPayApplication.class, args);
    }

}

SpringBootApplication에 @EnableScheduling 어노테이션을 붙여준다.


@Slf4j
@Component
public class ScheduledTasks {
    @Scheduled(cron = "0/1 * * * * *")
    public void reportCurrentTime() {
        log.info("hi");
    }
}

@Scheduled 어노테이션으로 주기적으로 실행할 메서드를 지정해준다.
cron도 사용하여 1초마다 실행되도록 하였다.


분석

@EnableScheduling 코드

@Target(ElementType.TYPE) //클래스, 인터페이스, 열거형에 적용 가능
@Retention(RetentionPolicy.RUNTIME) //런타임동안 유지됨
@Import(SchedulingConfiguration.class) //SchedulingConfiguration 클래스의 설정을 가져옴
@Documented
public @interface EnableScheduling {

}

EnableScheduling은 SchedulingConfiguration 클래스의 설정을 가져온다.
SchedulingConfiguration 클래스가 뭔지 살펴보자.


SchedulingConfiguration 클래스

@Configuration(proxyBeanMethods = false) //@Bean 메서드 호출 시 프록시 사용x
@Role(BeanDefinition.ROLE_INFRASTRUCTURE) //스프링의 infrastructure 역할을 함
public class SchedulingConfiguration {

	@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
		return new ScheduledAnnotationBeanPostProcessor();
	}

}

여기서는 ScheduledAnnotationBeanPostProcessor를 빈으로 등록해주고 있다.


ScheduledAnnotationBeanPostProcessor 클래스 생성자

public ScheduledAnnotationBeanPostProcessor() {
	this.registrar = new ScheduledTaskRegistrar();
}

registrar로 ScheduledTaskRegistrar클래스 객체를 등록하는 것을 볼 수 있다.


ScheduledAnnotationBeanPostProcessor의 postProcessAfterInitialization 메서드

초기화된 후 실행되는 메서드에서는
(1) @Scheduled가 달린 메서드를 스캔하여 annotationMethods map에 저장한다.
(2) 이후 annotationMethods에 포함된 메서드들을 processScheduled 메서드로 실행시킨다. (스케줄러에 등록하는 것)


processScheduled 메서드

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    // Is the method a Kotlin suspending function? Throws if true and the reactor bridge isn't on the classpath.
    // Does the method return a reactive type? Throws if true and it isn't a deferred Publisher type.
    if (reactiveStreamsPresent && ScheduledAnnotationReactiveSupport.isReactive(method)) {
        processScheduledAsync(scheduled, method, bean);
        return;
    }
    processScheduledSync(scheduled, method, bean);
}

processScheduledSync를 타고 들어가면 processScheduledTask 메서드를 호출하는 것을 볼 수 있다.

processScheduledTask 메서드에서는 cron식이나 fixedDelay 등에 대해 제대로 입력했는지 유효성 검사를 한다.

cron 식을 사용했다면 registrar registrar의 scheduleCronTask를 호출하여 cronTask를 등록한 ScheduledTask를 tasks에 추가한다.


ScheduledTaskRegistrar 클래스

public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {
	public static final String CRON_DISABLED = "-";

	@Nullable
	private TaskScheduler taskScheduler;
    
    ...
    
	@Nullable
	public ScheduledTask scheduleCronTask(CronTask task) {
		ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
		boolean newTask = false;
		if (scheduledTask == null) {
			scheduledTask = new ScheduledTask(task);
			newTask = true;
		}
		if (this.taskScheduler != null) {
			scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
		}
		else {
			addCronTask(task);
			this.unresolvedTasks.put(task, scheduledTask);
		}
		return (newTask ? scheduledTask : null);
	}
}

scheduleCronTask에서는 TaskScheduler의 schedule을 호출하여 주기적으로 실행되어야할 task를 등록해준다.

TaskScheduler의 설명은 다음과 같다.

Task scheduler interface that abstracts the scheduling of Runnables based on different kinds of triggers.
This interface is separate from SchedulingTaskExecutor since it usually represents a different kind of backend, i.e. a thread pool with different characteristics and capabilities. Implementations may implement both interfaces if they can handle both kinds of execution characteristics.
The 'default' implementation is org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler, wrapping a native java.util.concurrent.ScheduledExecutorService and adding extended trigger capabilities.

TaskScheduler는 다양한 종류의 trigger를 기반으로 Runnable의 스케줄링을 추상화하는 인터페이스이고, 기본 구현은 ThreadPoolTaskScheduler클래스이며, java.util.concurrent.ScheduledExecutorService를 래핑하고 확장된 트리거 기능을 추가했다고 한다.

구현체인 ThreadPoolTaskScheduler 클래스의 schedule 메서드를 따라가보자


ThreadPoolTaskScheduler 클래스

@Override
@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
    ScheduledExecutorService executor = getScheduledExecutor();
    try {
        ErrorHandler errorHandler = this.errorHandler;
        if (errorHandler == null) {
            errorHandler = TaskUtils.getDefaultErrorHandler(true);
        }
        return new ReschedulingRunnable(task, trigger, this.clock, executor, errorHandler).schedule();
    }
    catch (RejectedExecutionException ex) {
        throw new TaskRejectedException(executor, task, ex);
    }
}

여기서는 ReschedulingRunnable 객체를 만들어 schedule 메서드를 실행시킨다. schedule 메서드를 따라가보자


ReschedulingRunnable

private final ScheduledExecutorService executor;

...

@Nullable
public ScheduledFuture<?> schedule() {
    synchronized (this.triggerContextMonitor) {
        this.scheduledExecutionTime = this.trigger.nextExecution(this.triggerContext);
        if (this.scheduledExecutionTime == null) {
            return null;
        }
        
        //delay를 계산하여 executor의 schedule호출
        Duration delay = Duration.between(this.triggerContext.getClock().instant(), this.scheduledExecutionTime);
        this.currentFuture = this.executor.schedule(this, delay.toNanos(), TimeUnit.NANOSECONDS);
        return this;
    }
}

...

@Override
public void run() {
    Instant actualExecutionTime = this.triggerContext.getClock().instant();
    super.run(); //실제 schedule 메서드 수행
    
    Instant completionTime = this.triggerContext.getClock().instant();
    synchronized (this.triggerContextMonitor) {
        Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
        this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
        
        //실행이 끝나면 다시 schedule()호출
        if (!obtainCurrentFuture().isCancelled()) {
            schedule();
        }
    }
}

schedule메서드에서는 delay를 계산하여, 해당 시간이 지나면 시작하는 작업을 executor.schedule 메서드에 등록한다.

이후에 executor는 delay 후 쓰레드를 할당해 run()메서드를 실행하는데, 실행이 끝나면 다시 RescheduleRunnable의 schedule 메서드를 호출해 작업을 executor에 재등록한다.

이런식으로 작업이 무한히 진행이 된다.



요약

  1. @EnableScheduling 어노테이션에서는 SchedulingConfiguration에 등록된 설정을 가져온다. 스프링 서버 시작 시, ScheduledAnnotationBeanPostProcessor가 생성되어 빈으로 등록된다.
  2. ScheduledAnnotationBeanPostProcessor는 초기화된 후 @Scheduled가 붙은 메서드를 스캔하여 map에 저장하고 processScheduled 메서드를 실행시킴
  3. processScheduledTask 메서드를 통해 cron, fixedDelay 등의 옵션에 따라 알맞은 Task가 진행된다
  4. 이후 여러 클래스들을 거쳐 TaskScheduler에 주기적으로 실행할 task들을 등록한다. (schedule 메서드 사용)
  5. 원하는 Delay가 지나면 executor(ScheduledExecutorService)가 쓰레드풀에서 쓰레드를 할당하여 run()메서드 실행. 실행이 끝나면 다시 schedule()메서드를 호출하여 작업을 executor에 등록
  6. 무한 반복



참고자료

https://velog.io/@sunaookamisiroko/spring-scheduler-%EC%9E%91%EB%8F%99-%EC%9B%90%EB%A6%AC
https://pompitzz.github.io/blog/Spring/Scheduler.html#_1-scheduledannotationbeanpostprocessor-%E1%84%83%E1%85%B3%E1%86%BC%E1%84%85%E1%85%A9%E1%86%A8

profile
깊게 탐구하는 것을 좋아하는 백엔드 개발자 지망생 lime입니다! 게시글에 틀린 정보가 있다면 지적해주세요. 감사합니다. 이전블로그 주소: https://fladi.tistory.com/

0개의 댓글