위의 글과 이어진다.
지난번엔 @Scheduled 를 곧바로 스케줄러가 실행되는 runJob 에 걸어서 사용했었다.
@Scheduled(cron = "0/10 * * * * *") // 10초마다 실행
public void runJob() {
// logic
}
그런데 배치의 개수가 2개 이상이고, 각자 다른 스케줄을 걸어주어야 하는 상황이라면 ? 기존 코드대로라면 이에 대응하기가 어려울 것이다.
그래서 코드를 다음과 같이 수정해보기로 했다.
1) 각 배치마다 스케줄링을 다르게 할 수 있을 것
2) 특정 배치의 스케줄링은 동적으로 변경 할 수 있을 것
매달 수행해야 하는 배치의 날짜를 DB에서 읽어와서 적용하는 경우 동적 스케줄링이 필요할 것이다.
일단 동적 스케줄링을 위한 새로운 job을 추가하고, 그에 맞게 추상화 작업도 진행할 예정이다.
spring:
datasource:
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mariadb://localhost:3306/mybatch?autoReconnect=true&characterEncoding=UTF-8&allowMultiQueries=true&allowPublicKeyRetrieval=true&useSSL=false
username: username
password: password
batch:
job:
enabled: false # 서버 재시작시 배치 자동실행 방지
jdbc:
initialize-schema: always
job-setting:
hello:
name: "helloJob"
enabled: true
cron: "30 * * * * ?" # 30초 일때만 실행
dynamic:
name: "dynamicJob"
enabled: true
yml 파일을 조금 수정해줬다. Job 이름을 하드코딩 하는 대신, yml 파일에 등록해놓고 @Value 를 통해 주입하는 방식을 사용할 것이다.
job 이라는 폴더를 만들어서, 이전의 BatchConfig.java 파일을 해당 폴더로 옮겨준다. BatchConfig 클래스는 이제부터 인터페이스로 추상화 될 것이다.
import org.springframework.batch.core.Job;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.transaction.PlatformTransactionManager;
public interface BatchConfig {
String getJobName();
boolean isJobEnabled();
String getCronExpression();
void setCronExpression(String cronExpression);
Job createJob(JobRepository jobRepository, PlatformTransactionManager transactionManager);
}
Step과 Tasklet은 한 Job에서 여러개를 구현할 수도 있으니 굳이 추상화 시키진 않았다.
기존에 testJob 으로 구현했던 부분이다. BatchConfig 를 implements 해준다.
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.support.DefaultBatchConfiguration;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@Data
@RequiredArgsConstructor
@Slf4j
public class HelloBatchJob extends DefaultBatchConfiguration implements BatchConfig {
@Value("${job-setting.hello.cron}")
private String cronExpression;
@Value("${job-setting.hello.enabled}")
private boolean isJobEnabled;
@Value("${job-setting.hello.name}")
private String jobName;
@Bean("hello")
@Override
public Job createJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
Job job = new JobBuilder(getJobName(),jobRepository)
.incrementer(new RunIdIncrementer())
.start(executeStep(jobRepository,transactionManager))
.build();
return job;
}
private Step executeStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
Step step = new StepBuilder("helloStep",jobRepository)
.allowStartIfComplete(true)
.tasklet(helloTasklet(),transactionManager)
.build();
return step;
}
private Tasklet helloTasklet(){
return ((contribution, chunkContext) -> {
log.info("***** hello batch! *****");
return RepeatStatus.FINISHED;
});
}
}
구현체가 늘어날 수록 createJob 이 여러개가 될 것이기 때문에
@Bean만 있어선 Spring boot 에서 인식하지 못하니, 구분할 수 있게끔 @Bean("hello") 라고 해 주었다.
취향껏 하자.
동적 스케줄링을 적용할 job
DB 연결하기 귀찮고 관련 테이블 예제도 안만들어서 간단하게 cron식을 저장할 배열을 만들고 거기서 랜덤으로 cron식을 골라오도록 했다.
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@Configuration
@Data
@RequiredArgsConstructor
@Slf4j
public class DynamicBatchJob implements BatchConfig{
private String cronExpression;
@Value("${job-setting.dynamic.enabled}")
private boolean isJobEnabled;
@Value("${job-setting.dynamic.name}")
private String jobName;
private static final String[] cronExpressions = {
// 동적 cron 변경을 위한 임시 cron 식 변수
"0 * * * * ?",
"10 * * * * ?",
"20 * * * * ?",
"30 * * * * ?",
"40 * * * * ?",
"50 * * * * ?",
};
@Bean("dynamic")
@Override
public Job createJob(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
Job job = new JobBuilder(getJobName(),jobRepository)
.incrementer(new RunIdIncrementer())
.start(executeStep(jobRepository,transactionManager))
.build();
return job;
}
private Step executeStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
Step step = new StepBuilder("dynamicStep",jobRepository)
.allowStartIfComplete(true)
.tasklet(dynamicTasklet(),transactionManager)
.build();
return step;
}
private Tasklet dynamicTasklet(){
return ((contribution, chunkContext) -> {
log.info("***** dynamic scheduling batch *****");
return RepeatStatus.FINISHED;
});
}
public String getRandomCronExpression(){
Random random = new Random();
List<String> crons = Arrays.asList(cronExpressions);
String cron = crons.get(random.nextInt(crons.size()));
return cron;
}
}
scheduler 폴더를 만든다.
기존의 BatchScheduler는 추상 클래스가 될 것이다.
BatchConfig를 구현한 구현체의 jobName에 따라 유연하게 동작할 수 있게끔 수정해 두었다.
import com.example.batch.Job.BatchConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Slf4j
@RequiredArgsConstructor
@Component
public abstract class BatchScheduler {
protected ThreadPoolTaskScheduler scheduler;
protected final JobLauncher jobLauncher;
protected final JobRegistry jobRegistry;
protected final BatchConfig batchConfig;
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(){
JobRegistryBeanPostProcessor jobProcessor = new JobRegistryBeanPostProcessor();
jobProcessor.setJobRegistry(jobRegistry);
return jobProcessor;
}
protected void startScheduler(){
log.info("================================================");
log.info(">>> [{}] START",batchConfig.getJobName());
scheduler = new ThreadPoolTaskScheduler();
scheduler.initialize();
log.info("[{}] cron: {}",batchConfig.getJobName(),batchConfig.getCronExpression());
CronTrigger cronTrigger = new CronTrigger(batchConfig.getCronExpression());
scheduler.schedule(runJob(),cronTrigger);
}
protected void stopScheduler(){
log.info(">>> [{}] STOP",batchConfig.getJobName());
if(scheduler == null){
return ;
}
scheduler.shutdown();
}
public void updateCronExpression(String newCron) {
stopScheduler();
batchConfig.setCronExpression(newCron);
startScheduler();
}
private Runnable runJob() {
return () -> {
launch(batchConfig.getJobName());
};
}
private void launch(String jobName){
String time = LocalDateTime.now().toString();
try {
Job job = jobRegistry.getJob(jobName);
JobParametersBuilder jobParam = new JobParametersBuilder().addString("time", time);
jobLauncher.run(job, jobParam.toJobParameters());
} catch (
NoSuchJobException|
JobInstanceAlreadyCompleteException |
JobExecutionAlreadyRunningException |
JobParametersInvalidException |
JobRestartException e
) {
throw new RuntimeException(e);
}
}
}
ThreadPoolTaskScheduler 는 스레드 풀에 job을 자동으로 등록하고 관리해주는 역할을 한다.
👉 영어 싫을까봐 한국 블로그도 넣어놨다.
https://docs.spring.io/spring-framework/docs/4.3.x/spring-framework-reference/html/scheduling.html
https://velog.io/@rara_kim/Spring-Thread-Pool-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0feat.-Scheduler
import com.example.batch.Job.HelloBatchJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class HelloBatchScheduler extends BatchScheduler{
public HelloBatchScheduler(
JobLauncher jobLauncher,
JobRegistry jobRegistry,
HelloBatchJob helloBatchJob
) {
super(jobLauncher, jobRegistry,helloBatchJob);
if(helloBatchJob.isJobEnabled()){
startScheduler();
}
}
}
import com.example.batch.Job.DynamicBatchJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DynamicBatchScheduler extends BatchScheduler {
public DynamicBatchScheduler(JobLauncher jobLauncher,
JobRegistry jobRegistry,
DynamicBatchJob dynamicBatchJob) {
super(jobLauncher, jobRegistry, dynamicBatchJob);
String cron = dynamicBatchJob.getRandomCronExpression();
dynamicBatchJob.setCronExpression(cron);
if(dynamicBatchJob.isJobEnabled()){
startScheduler();
}
}
}
각 Job에 대한 스케줄러는 BatchScheduler를 상속받고, 생성자를 구현해주는 것으로 끝난다.
동적 스케줄링을 위해 주기적으로 cron 식을 업데이트 시켜줄 service 로직이다.
import com.example.batch.Job.DynamicBatchJob;
import com.example.batch.scheduler.DynamicBatchScheduler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
@EnableScheduling
@Slf4j
public class UpdateScheduleService {
private final DynamicBatchScheduler dynamicBatchScheduler;
private final DynamicBatchJob dynamicBatchJob;
@Scheduled(cron = "59 * * * * ?") // 59 초 마다 cron 업데이트 실행
public void updateCronInDynamicBatch(){
if(dynamicBatchJob.isJobEnabled()){
log.info("run update schedule service");
String updatedCron = dynamicBatchJob.getRandomCronExpression();
log.info("updated cron: {}",updatedCron);
dynamicBatchScheduler.updateCronExpression(updatedCron);
}
}
}
DynamicBatchScheduler의 updateCronExpression 메서드가 59초마다 호출되면서 cron식을 변경한다. 이 서비스 로직의 주기를 어떻게 설정하냐에 따라 한달에 한번, 일주일에 한번씩 cron식의 업데이트를 수행할 것이다.
처음에 dynamicJob 은 10초에 실행되도록 설정 되었다.
59초에 cron 업데이트가 수행되고, 기존의 job 이 멈춘다.
바뀐 cron식 대로 0초에 dynamicJob이 수행되는 것을 확인할 수 있다.
40초로 변경된 이후에도 정상적으로 수행함을 확인할 수 있었다.
랜덤으로 cron식을 설정하는 부분만 DB에서 스케줄을 읽어오도록 변경한다면 동적 스케줄링을 무리없이 구현할 수 있을것이다.