배치와 스케줄러

smallcherry's techlog·2022년 10월 24일
6
post-custom-banner

배치 (Batch)

배치란?

  • 정의
    • 데이터의 일괄처리 (데이터를 실시간으로 처리하는 개념이 아님)
  • 특징
    • 사용자에게 빠른 응답이 필요하지 않은 서비스에 적용
    • 특정 시간 이후에는 자원을 거의 사용하지 않는 것이 특징
  • 예시
    • 은행의 정산작업
    • 파일정보 일괄 삭제
    • 커머스의 통계 및 집계기능 등

어떤 조건을 만족해야 “배치”인가?

  • 배치는 아래 조건을 만족해야 한다.
    조건설명파일 만료시간 정보 삭제 기능
    1대용량 데이터대량의 데이터를 가져오거나, 전달하거나, 계산하는 등의 처리를 할 수 있어야 한다.대량의 데이터를 삭제한다.
    2자동화심각한 문제 해결을 제외하고 사용자 개입 없이 실행되어야 한다.Scheduler를 이용하여 일정 시간마다 자동으로 실행하도록 한다.
    3견고성잘못된 데이터를 충돌 및 중단 없이 처리할 수 있어야 한다.현재 날짜보다 이전인 데이터만 지워주면 되기 때문에 ‘잘못된 데이터’ 가 존재할 수 없다.
    4신뢰성로깅 및 추적을 통해 무엇이 잘못되었는 지를 추적할 수 있어야 한다.
    5성능지정한 시간 안에 처리를 완료하거나 동시에 실행되는 다른 어플을 방해하지 않도록 수행되어야 한다.

배치 어플리케이션

  • 단발성 대용량 데이터를 처리하는 어플리케이션

스케줄러 (Scheduler)

스케줄러란?

  • 정의
    • 일정한 시간 간격 또는 일정한 시각에 특정 로직을 돌리기 위해 사용하는 것
  • 특징
    • 배치를 구현하기 위해서는 스케줄러를 사용해야 함 → 배치와 스케줄러는 비교 대상이 아님!
  • Spring에서 제공하는 스케줄러
    • Spring Scheduler
    • Spring Quartz

스케줄러 — Spring Scheduler vs Spring Quartz

결론부터 말하자면

만약 단순한 Scheduling에 따른 작업이 필요하시다면 단연코 Spring Scheduler를 추천합니다. Spring Quartz는 좀 더 Scheduling의 세밀한 제어가 필요할 때, 그리고 만들어야하는 Scheduling 서비스 노드가 멀티이기 때문에 클러스터링이 필요할 때 여러분이 만들고자 하는 서비스에 도움이 될 것입니다.

  • 출처
[[Spring] Scheduler 어떤걸 사용해야 할까 ? - Spring Scheduler와 Spring Quartz](https://sabarada.tistory.com/113)

Spring Scheduler

  • 사용법
    • SpringBoot Stater에 기본 제공됨 (별도의 의존성 추가 필요 없음)
    • 사용하기
      • main 함수가 포함된 클래스에 @EnableScheduling 어노테이션 붙인다.
        • 예시
          @EnableScheduling
          @SpringBootApplication
          public class Application() {
              public static void main(String[] args) {
                  SpringApplication.run(Application.class, args);
              }
          }
      • 스케줄링을 원하는 메서드에 @Scheduled어노테이션을 붙여주면 된다. 스케줄링을 할 메서드는 아래 두 개의 조건을 만족해야 한다
        1. return type이 void일 것
        2. parameter가 없을 것
        • 예시
          public class Scheculer() {
          
              @Scheduled(fixedDelay = 1000) // scheduler 끝나는 시간 기준으로 1000 간격으로 실행
              public void scheduleFixedDelayTask() {
                  System.out.println(
                  "Fixed delay task - " + System.currentTimeMillis() / 1000);
              }
          
              @Scheduled(fixedRate = 1000) // scheduler 시작하는 시간 기준으로 1000 간격으로 실행
              public void scheduleFixedRateTask() {
                  System.out.println(
                  "Fixed rate task - " + System.currentTimeMillis() / 1000);
              }
          
              @Scheduled(cron = "0 15 10 15 * ?") // cron에 따라 실행
              public void scheduleTaskUsingCronExpression() {
                  long now = System.currentTimeMillis() / 1000;
                  System.out.println(
                  "schedule tasks using cron jobs - " + now);
              }
          
              @Scheduled(cron = "0 15 10 15 * ?", zone = "Europe/Paris") // cron에 TimeZone 설정 추가
          }
  • 동작 방식
    • 기본적으로 thread 1개를 이용하여 동기 방식으로 실행
      • 1번 스케줄이 끝나지 않으면 2번 스케줄 시작 시간이 되었다고 해도 시작되지 않음
      • 비동기 방식으로 실행하고 싶으면 @EnableAsync 어노테이션을 이용

Spring Quartz

  • 사용법
    • 라이브러리 의존성 추가 필요 implementation "org.springframework.boot:spring-boot-starter-quartz”
    • 사용하기
      • 필수 구현 요소 (옵셔널하게 구현해야 하는 요소들도 있지만 본 글에선 다루지 않음)
        • Job Interface로 제공되며 해당 인터페이스를 구현하여 실제로 실행되는 로직을 작성한다.
          • 코드
            @Configuration
            public class CollectJob implements Job {
            
              private final CollectService collectService;
            
              public CollectJob(CollectService collectService) {
                this.collectService = collectService;
              }
            
              @Override
              public void execute(JobExecutionContext context) throws JobExecutionException {
                System.out.println("[Collect] collect Job Start...");
              }
            }
        • JobDetail Job을 실행시키기 위한 구체적인 정보 (Job에 대한 설명, Job ID)를 가지고 있는 인스턴스. JobBuilder API를 통해 생성할 수 있음.
          • 코드
            @Bean
            public JobDetail tistoryJobDetail() {
            return JobBuilder.newJob().ofType(CollectJob.class)
                .storeDurably()
                .withIdentity("job_detail")
                .withDescription("Invoke Tistory Job service...")
                .build();
            }
        • Trigger Job이 실행되는 조건을 가지고 있는 인스턴스. TriggerBuilder API를 통해 생성할 수 있음. 조건으로 단순히 특정 시간 간격을 설정할 수 있고, Cron 표현식 (Cron: 유닉스 계열의 Job Scheduler, Cron 표현식: Cron Scheduler의 정규 표현식) 으로도 작성할 수 있음.
          • 코드
            @Bean
            public Trigger tistoryTrigger(@Qualifier("tistoryJobDetail") JobDetail job) {
            return TriggerBuilder.newTrigger().forJob(job)
                .withIdentity("tistory_job_trigger")
                .withSchedule(cronSchedule("0 0 9 * * ?")
                    .inTimeZone(TimeZone.getTimeZone("Asia/Seoul")))
                .build();
            }
            application.yml
            spring:
              quartz:
                job-store-type: memory
  • Quartz가 제공하는 기능 (Spring Scheduler에서는 미지원 되는 것들)
    • DB 방식으로 Scheduler 간의 Clustering 기능 지원
      • Spring Quartz Clustering 이란?
        • 정의
          • JobStore기능에 데이터베이스 방식을 사용하여 다중서버 환경에서 서버들 간의 Job, Trigger 정보를 공유하여 클러스터링
            • JobStore 기능(Job과 Trigger에 대한 정보를 어떤 방식으로 저장하는 지를 정의한 기능)으로 메모리방식과 데이터베이스 방식이 사용되는데, 다중 서버 환경에서 데이터베이스 방식을 사용하면 서버들 간의 Job, Trigger 정보를 공유할 수 있으므로 클러스터링이 가능하다.
        • 이점
          • 고가용성: 서버 하나가 다운되더라도 다른 서버에 의해 Job이 실행됨
          • 확장성: Quartz 설정이 된 서버를 구동하면 자동으로 데이터베이스에 스케줄 서버로 등록되므로, 스케일 아웃으로 인해 서버가 늘어나도라도 함께 클러스터로 관리됨
          • 부하 분산: 랜덤분산이지만 여러 서버에 분산하여 실행 가능
    • 메인 스레드를 막지 않고 비동기적으로 동작할 수 있음
    • 스케줄링 관련 이벤트에 대한 콜백 리스너 인터페이스를 제공 → 일종의 플러그인 기능 제공으로, 사용자가 사용자가 유연하게 활용 가능

의사결정 — Quartz Scheduler or Spring Scheduler?

  • 본인의 생각
    • DB 테이블의 정보를 하루에 한 번 삭제 해주는 것만 실행하면 되는 단순한 배치 어플리케이션이고, 파일서버는 다중 서버가 아니므로 Quartz의 큰 이점 중 하나인 클러스터링도 사용할 수 없을 것으로 보여서 Spring Scheduler를 사용하여 구현하는 것이 합리적으로 보임
    • 다만 스레드 활용에 관해, 챗봇의 메인 스레드를 쓰지 않고 스레드를 따로 생성하여 실행하는 것 인지 확인 필요 (Scheduler의 비동기 기능도 활용 가능성 있음)

Spring Batch

Spring Batch

  • 정의
    • Spring 에서 배치 어플리케이션 제작을 위해 제공하는 프레임워크
  • 특징
    • 대용량 레코드 처리에 필수적인 기능 제공
      • 로깅/추적, 트랜잭션 관리, 작업 처리 통계, 작업 재시작, 건너뛰기 등
    • 대용량, 고성능 배치 작업을 가능 하게 함
      • 최적화, 파티셔닝 기술 사용
    • 배치가 실패하여 작업을 재시작하게 된다면 처음부터가 아닌 실패한 지점부터 실행하게 됨
    • 중복 실행 방지 기능 있음
      • 성공한 이력이 있는 배치는 동일한 Parameters로 실행 시 Exception 발생

Spring Batch 용어 (개념)

Job 관련

  • Job
    • 배치 처리 과정을 하나의 단위로 만들어놓은 객체
    • 배치 처리 과정에 있어 전체 계층 최상단에 위치
  • JobInstance
    • Job의 실행 단위.
    • Job을 실행시키게 되면 하나의 JobInstance가 생성됨
    • 예) 1월 1일과 1월 2일에 실행 하면 각각의 JobInstance가 생성. 1월 1일에 실행한 JobInstance가 실패하여 다시 실행을 시키더라도 이 JobInstance는 1월 1일에 대한 데이터만 처리하게 됨
  • JobParameters
    • JobInstance의 구별자 및 JobInstance에 전달되는 매개변수
    • JobParameter의 데이터 타입은 아래 네 가지만 지원
      • String, Double, Long, Date
  • JobExecution
    • JobInstance 실행 시도에 대한 정보를 저장하는 객체
    • JobInstance 실행에 대한 상태, 시작시간, 종료시간, 생성시간 등의 정보를 담고 있음
    • 예) 1월 1일에 실행한 JobInstance가 실패하여 재실행을 하여도 동일한 JobInstance를 실행시키지만, 이 각각 두 번에 대한 JobExecution은 개별로 생기게 됨

Step 관련

  • Step
    • Job의 배치 처리를 정의하고 순차적인 단계를 캡슐화 함
    • Job은 최소한 1개 이상의 Step을 가져야 하며, Job의 실제 일괄처리를 제어하는 모든 정보가 들어있음
  • StepExecution
    • Step 실행 시도에 대한 정보를 저장하는 객체
    • Step 실행에 대한 상태, 시작시간, 종료시간, 생성시간, read 수, commit 수, skip 수 등의 정보들이 저장
    • Job이 여러 개 Step으로 구성되어 있을 경우, 이전 단계의 Step이 실패하게 되면 다음 단계가 실행되지 않음으로, 이후 Step에 대한 StepExecution은 생성되지 않음
    • JobExecution과 동일하게 실제 Step이 시작이 될 때만 생성

실행 및 관리

  • ExecutionContext
    • Job간, Step 간 데이터를 공유할 수 있는 데이터 저장소
    • 종류
      • JobExecutionContext
        • commit 시점에 저장
      • StepExecutionContext
        • 실행 사이에 저장
    • Job 실패 시 ExecutionContext를 통해 마지막 실행값을 재구성할 수 있음
  • JobRepository
    • 배치 처리 정보를 저장하는 컴포넌트(위에서 소개한 컴포넌트)들을 관리
    • Job이 실행되게 되면, JobRepository에 JobExecution과 StepExecution을 생성하게 되며, JobRepository에서 이러한 Execution 정보들을 저장하고 조회하며 사용함
  • JobLauncher
    • Job과 JobParameter를 사용하여 Job을 실행하는 객체

Item 관련

  • ItemReader
    • Step에서 Item을 읽어오는 인터페이스
    • ItemReader에 대한 다양한 인터페이스가 존재, 다양한 방법으로 Item을 읽어올 수 있음.
  • ItemWriter
    • 처리된 데이터를 write할 때 사용.처리 결과물에 따라 Insert, Update 가능, Queue를 사용한다면 send도 가능
      Reader와 동일하게 다양한 인터페이스가 존재
    • Item을 chunk로 묶어서 처리
  • ItemProcessor
    • Reader에서 읽어온 Item 데이터를 처리하는 역할
    • 배치 처리의 필수 요소는 아니며, Reader, Writer, Processor 처리를 분리하여 각각의 역할을 명확히 구분하고 있음

파일 만료시간 정보 삭제 배치

  • Spring Scheduler 이용
    • 단일 Step을 가진 Job으로 구현 할 수 있을 것으로 보임
  • 구현 방법 https://gimmesome.tistory.com/204 : <2. batch + scheduler 구현> 부분 참고하여 구현할 것으로 보임
    • 스크랩 내용

      2. batch + scheduler 구현

      내가 구현하려는 '주기적인 파일 삭제 구현'은 여러개의 step을 필요로 하지 않으므로 단일 step으로만 구성하였다.
      1. build.gradle에 다음 코드를 추가한다.

        build.gradle

        implementation("org.springframework.boot:spring-boot-starter-batch")
        implementation("org.springframework.boot:spring-boot-starter-quartz")
        
        testImplementation 'org.springframework.batch:spring-batch-test'
        
      2. 아래 두줄을 application.properties에 추가한다.

        #spring boot batch + scheduler
        spring.batch.initialize-schema: always		# batch 스키마 자동 생성
        spring.batch.job.enabled=false			# 시작과 동시에 실행되는건 방지
        
      3. intellij 기준으로 'File>Invalidate Caches..' 로 캐시를 지우고 재시작시킨다.

      4. main 함수가 있는 클래스에 @EnableScheduling과 @EnableBatchProcessing을 붙여준다.

        MeetingDocumentApplication.java

        package com.tmax.meeting.document;
        
        import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
        import org.springframework.boot.SpringApplication;
        import org.springframework.boot.autoconfigure.SpringBootApplication;
        import org.springframework.scheduling.annotation.EnableScheduling;
        
        @EnableScheduling
        @EnableBatchProcessing
        @SpringBootApplication
        public class MeetingDocumentApplication {
        
          public static void main(String[] args) {
            SpringApplication.run(MeetingDocumentApplication.class, args);
          }
        
        }
        
      5. batch 파일과 scheduler파일을 만든다.

        https://s3-us-west-2.amazonaws.com/secure.notion-static.com/eb4ed1f8-8ee0-4544-96d8-f165133dc69f/img.png

      6. batch 파일에서 수행하고자 하는 Job을 step으로 구성한다.

        config / BatchConfig.java

        package com.tmax.meeting.document.config;
        
        import com.tmax.meeting.document.model.Document;
        import com.tmax.meeting.document.repository.DocumentRepository;
        import com.tmax.meeting.document.service.DocService;
        import java.util.List;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.batch.core.Job;
        import org.springframework.batch.core.Step;
        import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
        import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
        import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
        import org.springframework.batch.repeat.RepeatStatus;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import java.time.LocalDateTime;
        
        @Slf4j
        @Configuration
        @EnableBatchProcessing
        public class BatchConfig {
        
          @Autowired
          public JobBuilderFactory jobBuilderFactory;
        
          @Autowired
          public StepBuilderFactory stepBuilderFactory;
        
          @Autowired
          private DocumentRepository documentRepository;
        
          @Autowired
          private DocService docService;
        
          @Bean
          public Job job() {
        
            Job job = jobBuilderFactory.get("job")
                .start(step())
                .build();
        
            return job;
          }
        
          @Bean
          public Step step() {
            return stepBuilderFactory.get("step")
                .tasklet((contribution, chunkContext) -> {
                  log.info("Step!");
                  // 업데이트 시각이 일주일 이전인 문서 목록을 가져옴
                  // 1. 네이티브 쿼리 사용
                  List<Document> limitedDocuments = documentRepository.selectLimitedDocument();
                  // 2. JPQL 사용
                  // LocalDateTime now = LocalDateTime.now();
                  // LocalDateTime aWeekAgo = now.minusDays(7);
                  // List<Document> limitedDocuments = documentRepository.findByUpdateAtLessThan(aWeekAgo)
        
                  if (limitedDocuments.size() > 0 && limitedDocuments != null) {
                    for (Document document : limitedDocuments) {
                      // deleteDocument는 document_id를 받아 서버와 db에서 문서 삭제를 구현하는 서비스
                      docService.deleteDocument(document.getDocumentId());
                    }
                  }
                  return RepeatStatus.FINISHED;
                })
                .build();
          }
        }
        

        업데이트 시각이 일주일 이전인 문서 목록을 가져오는 쿼리문은 네이티브 쿼리, JPA로 구현해보았는데

        1. 네이티브 쿼리 사용

          respository /DocumentRespository.java

          @Repository
          public interface DocumentRepository extends JpaRepository<Document, Long>{
            @Query(value = "SELECT * FROM document WHERE DATE(updated_at) < DATE_SUB(NOW(), INTERVAL 7 DAY)", nativeQuery = true)
            List<Document> selectLimitedDocument();
          }
          

          JPA로 작성하려고 보니 날짜계산이 들어가서 어떻게 할까 하다가 Querydsl 쓰기 귀찮아서... ㅎ 네이티브쿼리로 한것이다.
          날짜 계산에 DATE_SUB( )을 사용한다는거!

        1. JPQL 사용

        JPQL로 하려면 LocalDateTime을 사용해서 7일전 날짜를 변수로 하여 이보다 빠른 update_at 값을 가지는 것을 가져오게 한다.

        config / BatchConfig.java 중..

        import java.time.LocalDateTime;
        
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime aWeekAgo = now.minusDays(7);
        
        List<Document> limitedDocuments = documentRepository.findByUpdatedAtLessThan(aWeekAgo)
        

        이렇게 7일전 날짜변수를 넘기면..> respository /DocumentRespository.java

        import java.time.LocalDateTime;
        
        @Repository
        public interface DocumentRepository extends JpaRepository<Document, Long>{
          List<Document> findByUpdatedAtLessThan(LocalDateTime aWeekAgo);
        }
        

        위 처럼 받아서 jpa를 수행한다.

      7. batch를 수행하기 위한 scheduler를 scheduler 파일에 구성한다. (매일 오전 9시에 실행되도록)

        config / BatchScheduler.java

        package com.tmax.meeting.document.config;
        
        import java.util.HashMap;
        import java.util.Map;
        import lombok.extern.slf4j.Slf4j;
        import org.springframework.batch.core.JobParameter;
        import org.springframework.batch.core.JobParameters;
        import org.springframework.batch.core.JobParametersInvalidException;
        import org.springframework.batch.core.launch.JobLauncher;
        import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
        import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.scheduling.annotation.Scheduled;
        import org.springframework.stereotype.Component;
        
        @Slf4j
        @Component
        public class BatchScheduler {
        
          @Autowired
          private JobLauncher jobLauncher;
        
          @Autowired
          private BatchConfig batchConfig;
        
          @Scheduled(cron = "0 0 9 * * *")
          public void runJob() {
        
            // job parameter 설정
            Map<String, JobParameter> confMap = new HashMap<>();
            confMap.put("time", new JobParameter(System.currentTimeMillis()));
            JobParameters jobParameters = new JobParameters(confMap);
        
            try {
              jobLauncher.run(batchConfig.job(), jobParameters);
            } catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException
                | JobParametersInvalidException | org.springframework.batch.core.repository.JobRestartException e) {
        
              log.error(e.getMessage());
            }
          }
        }
        

        위에서 jobLauncher.run()메소드는 첫번째 파라미터로 Job, 두번째 파라미터로 Job Parameter를 받고 있다.
        Job Parameter의 역할은 반복해서 실행되는 Job의 유일한 ID이다.

        0000-00-00 01:30:43.702  INFO 16963 --- [   scheduling-1] o.s.batch.core.step.AbstractStep         : Step: [tutorialStep] executed in 14ms
        ... 생략 ...
        0000-00-00 01:30:48.748  INFO 16963 --- [   scheduling-1] o.s.batch.core.job.SimpleStepHandler     : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=tutorialStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=
        

        위처럼, Job Parameter에 동일한 값이 세팅되도록 하면 두번째부터 실행되는 Job의 Step은 실행되지 않는다.

Reference

배치와 스케줄러

Spring Batch와 Scheduler

[스프링] batch + scheduler로 주기적인 파일 삭제 구현

Quartz Scheduler vs Spring Scheduler

[Spring] Scheduler 어떤걸 사용해야 할까 ? - Spring Scheduler와 Spring Quartz

Quartz Scheduler vs. Spring Scheduler

[Spring] Quartz 란 ( Quartz Scheduler )

Quartz

Spring Quartz Clustering

Quartz Clustering in Spring MVC

Quartz의 이점

https://velog.io/@park2348190/Spring-Boot-환경의-Quartz-Scheduler-활용#왜-quartz를-사용하는가:~:text=본론-,왜 Quartz를 사용하는가%3F,-Quartz는 홈페이지

profile
Java Developer
post-custom-banner

0개의 댓글