[Spring boot] @Scheduled를 통해 배치 동적 스케줄링 하기 (Spring batch 5.0) - 2

dev asdf·2023년 11월 19일
0

Spring Boot

목록 보기
2/6

https://velog.io/@asdf-dev/spring-boot-batch

위의 글과 이어진다.
지난번엔 @Scheduled 를 곧바로 스케줄러가 실행되는 runJob 에 걸어서 사용했었다.

 @Scheduled(cron = "0/10 * * * * *") // 10초마다 실행
    public void runJob() {
    	// logic
    }

그런데 배치의 개수가 2개 이상이고, 각자 다른 스케줄을 걸어주어야 하는 상황이라면 ? 기존 코드대로라면 이에 대응하기가 어려울 것이다.

그래서 코드를 다음과 같이 수정해보기로 했다.

1) 각 배치마다 스케줄링을 다르게 할 수 있을 것
2) 특정 배치의 스케줄링은 동적으로 변경 할 수 있을 것

매달 수행해야 하는 배치의 날짜를 DB에서 읽어와서 적용하는 경우 동적 스케줄링이 필요할 것이다.

일단 동적 스케줄링을 위한 새로운 job을 추가하고, 그에 맞게 추상화 작업도 진행할 예정이다.


application.yml

spring:
  datasource:
    driver-class-name: org.mariadb.jdbc.Driver
    url: jdbc:mariadb://localhost:3306/mybatch?autoReconnect=true&characterEncoding=UTF-8&allowMultiQueries=true&allowPublicKeyRetrieval=true&useSSL=false
    username: username
    password: password

  batch:
    job:
      enabled: false # 서버 재시작시 배치 자동실행 방지
    jdbc:
      initialize-schema: always
      
job-setting:
  hello:
    name: "helloJob"
    enabled: true
    cron: "30 * * * * ?" # 30초 일때만 실행
  dynamic:
    name: "dynamicJob"
    enabled: true

yml 파일을 조금 수정해줬다. Job 이름을 하드코딩 하는 대신, yml 파일에 등록해놓고 @Value 를 통해 주입하는 방식을 사용할 것이다.


1. JOB

job 이라는 폴더를 만들어서, 이전의 BatchConfig.java 파일을 해당 폴더로 옮겨준다. BatchConfig 클래스는 이제부터 인터페이스로 추상화 될 것이다.

BatchConfig.java

import org.springframework.batch.core.Job;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.transaction.PlatformTransactionManager;

public interface BatchConfig {
    String getJobName();
    boolean isJobEnabled();
    String getCronExpression();
    void setCronExpression(String cronExpression);
    Job createJob(JobRepository jobRepository, PlatformTransactionManager transactionManager);
}

Step과 Tasklet은 한 Job에서 여러개를 구현할 수도 있으니 굳이 추상화 시키진 않았다.

HelloBatchJava.java

기존에 testJob 으로 구현했던 부분이다. BatchConfig 를 implements 해준다.

import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.support.DefaultBatchConfiguration;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
@Data
@RequiredArgsConstructor
@Slf4j
public class HelloBatchJob extends DefaultBatchConfiguration implements BatchConfig {

    @Value("${job-setting.hello.cron}")
    private String cronExpression;

    @Value("${job-setting.hello.enabled}")
    private boolean isJobEnabled;

    @Value("${job-setting.hello.name}")
    private String jobName;

    @Bean("hello")
    @Override
    public Job createJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        Job job = new JobBuilder(getJobName(),jobRepository)
                       .incrementer(new RunIdIncrementer())
                       .start(executeStep(jobRepository,transactionManager))
                       .build();
        return job;
    }

    private Step executeStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        Step step = new StepBuilder("helloStep",jobRepository)
                   .allowStartIfComplete(true)
                   .tasklet(helloTasklet(),transactionManager)
                   .build();
        return step;
    }
    private Tasklet helloTasklet(){
        return ((contribution, chunkContext) -> {
            log.info("***** hello batch! *****");
            return RepeatStatus.FINISHED;
        });
    }
}

구현체가 늘어날 수록 createJob 이 여러개가 될 것이기 때문에
@Bean만 있어선 Spring boot 에서 인식하지 못하니, 구분할 수 있게끔 @Bean("hello") 라고 해 주었다.

👉 물론 이 방법만 있는건 아니다

https://ducktopia.tistory.com/80

취향껏 하자.

DynamicBatchJob.java

동적 스케줄링을 적용할 job
DB 연결하기 귀찮고 관련 테이블 예제도 안만들어서 간단하게 cron식을 저장할 배열을 만들고 거기서 랜덤으로 cron식을 골라오도록 했다.

import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

@Configuration
@Data
@RequiredArgsConstructor
@Slf4j
public class DynamicBatchJob implements BatchConfig{

    private String cronExpression;
    @Value("${job-setting.dynamic.enabled}")
    private boolean isJobEnabled;
    @Value("${job-setting.dynamic.name}")
    private String jobName;

    private static final String[] cronExpressions = {
        // 동적 cron 변경을 위한 임시 cron 식 변수
        "0 * * * * ?",
        "10 * * * * ?",
        "20 * * * * ?",
        "30 * * * * ?",
        "40 * * * * ?",
        "50 * * * * ?",
    };

    @Bean("dynamic")
    @Override
    public Job createJob(JobRepository jobRepository, 
                         PlatformTransactionManager transactionManager) {
        Job job = new JobBuilder(getJobName(),jobRepository)
                       .incrementer(new RunIdIncrementer())
                       .start(executeStep(jobRepository,transactionManager))
                       .build();
        return job;
    }

     private Step executeStep(JobRepository jobRepository, 
                              PlatformTransactionManager transactionManager) {
        Step step = new StepBuilder("dynamicStep",jobRepository)
                   .allowStartIfComplete(true)
                   .tasklet(dynamicTasklet(),transactionManager)
                   .build();
        return step;
    }

    private Tasklet dynamicTasklet(){
        return ((contribution, chunkContext) -> {
            log.info("***** dynamic scheduling batch *****");
            return RepeatStatus.FINISHED;
        });
    }

    public String getRandomCronExpression(){
        Random random = new Random();
        List<String> crons = Arrays.asList(cronExpressions);
        String cron = crons.get(random.nextInt(crons.size()));
        return cron;
    }
}

2. SCHEDULER

scheduler 폴더를 만든다.
기존의 BatchScheduler는 추상 클래스가 될 것이다.
BatchConfig를 구현한 구현체의 jobName에 따라 유연하게 동작할 수 있게끔 수정해 두었다.

BatchScheduler.java

import com.example.batch.Job.BatchConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;

@Slf4j
@RequiredArgsConstructor
@Component
public abstract class BatchScheduler {

    protected ThreadPoolTaskScheduler scheduler;
    protected final JobLauncher jobLauncher;
    protected final JobRegistry jobRegistry;
    protected final BatchConfig batchConfig;

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(){
        JobRegistryBeanPostProcessor jobProcessor = new JobRegistryBeanPostProcessor();
        jobProcessor.setJobRegistry(jobRegistry);
        return jobProcessor;
    }

    protected void startScheduler(){
        log.info("================================================");
        log.info(">>> [{}] START",batchConfig.getJobName());
        scheduler = new ThreadPoolTaskScheduler();
        scheduler.initialize();
        log.info("[{}] cron: {}",batchConfig.getJobName(),batchConfig.getCronExpression());
        CronTrigger cronTrigger = new CronTrigger(batchConfig.getCronExpression());
        scheduler.schedule(runJob(),cronTrigger);
    }

    protected void stopScheduler(){
        log.info(">>> [{}] STOP",batchConfig.getJobName());
        if(scheduler == null){
            return ;
        }
        scheduler.shutdown();
    }

    public void updateCronExpression(String newCron) {
        stopScheduler();
        batchConfig.setCronExpression(newCron);
        startScheduler();
    }

    private Runnable runJob() {
        return () -> {
            launch(batchConfig.getJobName());
        };
    }

    private void launch(String jobName){
        String time = LocalDateTime.now().toString();
       try {
           Job job = jobRegistry.getJob(jobName);
           JobParametersBuilder jobParam = new JobParametersBuilder().addString("time", time);
           jobLauncher.run(job, jobParam.toJobParameters());
       } catch (
                NoSuchJobException|
                JobInstanceAlreadyCompleteException |
                JobExecutionAlreadyRunningException |
                JobParametersInvalidException |
                JobRestartException e
       ) {
           throw new RuntimeException(e);
        }
    }
}

ThreadPoolTaskScheduler 는 스레드 풀에 job을 자동으로 등록하고 관리해주는 역할을 한다.

👉 영어 싫을까봐 한국 블로그도 넣어놨다.

https://docs.spring.io/spring-framework/docs/4.3.x/spring-framework-reference/html/scheduling.html
https://velog.io/@rara_kim/Spring-Thread-Pool-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0feat.-Scheduler

HelloBatchScheduler.java

import com.example.batch.Job.HelloBatchJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class HelloBatchScheduler extends BatchScheduler{

    public HelloBatchScheduler(
            JobLauncher jobLauncher,
            JobRegistry jobRegistry,
            HelloBatchJob helloBatchJob
            ) {
        super(jobLauncher, jobRegistry,helloBatchJob);
        if(helloBatchJob.isJobEnabled()){
            startScheduler();
        }
    }
}

DynamicBatchScheduler.java

import com.example.batch.Job.DynamicBatchJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DynamicBatchScheduler extends BatchScheduler {

    public DynamicBatchScheduler(JobLauncher jobLauncher,
                                 JobRegistry jobRegistry,
                                 DynamicBatchJob dynamicBatchJob) {
        super(jobLauncher, jobRegistry, dynamicBatchJob);
        String cron = dynamicBatchJob.getRandomCronExpression();
        dynamicBatchJob.setCronExpression(cron);
        if(dynamicBatchJob.isJobEnabled()){
            startScheduler();
        }
    }
}

각 Job에 대한 스케줄러는 BatchScheduler를 상속받고, 생성자를 구현해주는 것으로 끝난다.

3. SERVICE

동적 스케줄링을 위해 주기적으로 cron 식을 업데이트 시켜줄 service 로직이다.

UpdateScheduleService.java

import com.example.batch.Job.DynamicBatchJob;
import com.example.batch.scheduler.DynamicBatchScheduler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
@EnableScheduling
@Slf4j
public class UpdateScheduleService {
    private final DynamicBatchScheduler dynamicBatchScheduler;
    private final DynamicBatchJob dynamicBatchJob;

    @Scheduled(cron = "59 * * * * ?") // 59 초 마다 cron 업데이트 실행
    public void updateCronInDynamicBatch(){
        if(dynamicBatchJob.isJobEnabled()){
            log.info("run update schedule service");
            String updatedCron = dynamicBatchJob.getRandomCronExpression();
            log.info("updated cron: {}",updatedCron);
            dynamicBatchScheduler.updateCronExpression(updatedCron);
        }
    }
}

DynamicBatchScheduler의 updateCronExpression 메서드가 59초마다 호출되면서 cron식을 변경한다. 이 서비스 로직의 주기를 어떻게 설정하냐에 따라 한달에 한번, 일주일에 한번씩 cron식의 업데이트를 수행할 것이다.


결과


처음에 dynamicJob 은 10초에 실행되도록 설정 되었다.

59초에 cron 업데이트가 수행되고, 기존의 job 이 멈춘다.
바뀐 cron식 대로 0초에 dynamicJob이 수행되는 것을 확인할 수 있다.

40초로 변경된 이후에도 정상적으로 수행함을 확인할 수 있었다.

랜덤으로 cron식을 설정하는 부분만 DB에서 스케줄을 읽어오도록 변경한다면 동적 스케줄링을 무리없이 구현할 수 있을것이다.

https://github.com/DEV-asdf-516/spring-batch-exam

0개의 댓글

관련 채용 정보