SpringBoot Batch 정리

devdo·2022년 4월 25일
2

SpringBoot

목록 보기
32/39
post-thumbnail

SpringBatch에 알아야 하는 내용을 정리합니다.

batch란?
요청이 들어올때 마다 실시간으로 데이터를 처리 하는것이 아닌 일괄적으로 모아서 하는 작업을 배치 (Batch) 작업이라고 합니다. 이러한 특성때문에 배치작업은 일반적으로, 정해진 시간에 대량의 데이터를 일괄적으로 처리한다는 특징을 갖습니다.

절대 스케쥴러라는 뜻은 포함하지 않습니다.

스케쥴러는 batch를 보완할 때 쓰입니다. 다시말해, 주기적으로 일괄처리할 때 batch에 주기성을 덧붙이는 것으로 보면 됩니다. ex. cron


출처: https://spring.io/batch


Batch vs Scheduler의 차이

  • 배치(Batch) : 일괄(대량)처리
    사용자와 상호작용 없이 여러 개의 작업을 미리 정해진 순서에 따라 중단없이 처리하는 것
    -> Spring Batch
    ex. 대량의 유저회원들에게 알림 메시지 보내기

중요 용어

1) JobLauncher

Job을 실행하는 런쳐

2) Job

정해진 Step을 실행시킬 작업을 의미로,
Job configuration과 대응되는 단위입니다.

3) Step : Job 내부에서 수행될 독립적으로 실행하고 순서가 지정될 수 있습니다. 1개의 Step 또는 N개의 Step들로 이루어질 수 있습니다.
Job > 여러 Step (2종류: Tasklet, Chunk)


4) 그외 알아두어야 할 용어

batch 메타 테이블

Spring batch가 제공하는 가장 기본적인 기능으로 배치 작업 하는동안 사용되는 모든 메타정보들 (작업 시간, 파라미터, 정상수행 여부 …)을 기록하여 작업 중에 사용하거나 모니터링 용도로 사용 할 수 있게 해줍니다.

이러한 이유로 Spring batch를 사용하기 위한 ‘첫번째 단계’는 다름이 아닌 메타 데이터 스키마을 구성하는 것 입니다.(application.yml - batch:jdbc:initialize-schema: always)

Spring Batch에서는 DB를 통해 완료/실패 와 같은 상태관리를 합니다.
크게 4가지 상태를 DB에 저장합니다.

1 이전 실행 Job History
2 실패한 Batch와 Parameter / 성공한 Job
3 실행 재개 지점
4 Job 기준 Step현황과 성공/실패 여부


Tasklet vs Chunk

: 간단하게 구성할 때는 Tasklet, 하지만 스프링배치는 Chunk 지향입니다.

Task 기반 : 하나의 작업 기반으로 실행
Chunk 기반 : 하나의 큰 덩어리를 n개씩 나눠서 이 단위로 실행.
즉, chunk 단위로 commit과 rollback이 이루어지는 것임.

Step의 종류는 아래 3가지입니다.(Chunk 기준)

  • ItemReader
  • ItemWriter
  • ItemProcessor(필수x)

SpringBatch의 Step(Tasklet, Chunk)들은 아래 그림대로 과정을 거칩니다.

  • ItemReader : 단순하게 아이템 하나를 읽는 Strategy이다.
    배치 어플리케이션은 데이터를 읽는 것으로 시작,
    다양한 DataSource(꼭 db을 말하는 것은 아님!)로부터 데이터를 읽을 수 있는 수 있는 구현체를 잘 정의하는 게 중요하다.

  • ItemProcessor : Reader로부터 받아온 Item을 가공하는 담당.
    Item이 <I>타입이었다면 <O>타입으로 변경해서 넘길 수 있음.
    배치 어플리케이션의 핵심 비즈니스 로직이 들어가게 됨.
    하지만 따로 가공할 로직이 없다면 만들지 않아도됨(Optional)
    프로세서는 개발자가 직접 구현하는 영역이기 때문에 제공되는 구현체의 종류도 Reader에 비해 별로 없음.

  • ItemWriter: Reader, Processor로부터 받아온 Item에 대한 마지막 처리 단계(마지막이니 void) - db에 저장하거나, 파일로 쓰거나, 이벤트를 발행하거나 등등...


JobLauncher와 JobParameter

  • JobLauncher :
    JobLauncher는 배치 Job을 실행시키는 역할로,
    Job과 Parameter를 받아서 실행하고 JobExecution를 반환합니다.
    Spring Batch는 SimpleJobLauncher 라는 단일 JobLauncher 만 제공합니다. 배치 잡이 실행되면 JobInstance가 생성됩니다.
  • JobParameter : Spring Batch는 외부 혹은 내부에서 파라미터를 받아 여러 Batch 컴포넌트에서 사용할 수 있게 지원하고 있습니다.

java.util.Map<String, JobParameter> 객체래퍼(wrapper) 객체입니다. 해당 파라미터는 변환도 가능합니다.

Job Parameter를 사용하기 위해선 항상 Spring Batch 전용 Scope를 선언해야 합니다.
크게 @StepScope@JobScope 2가지가 있습니다

// SpEL로 선언해서 사용할 수 있습니다.
@Value("#{jobParameters[파라미터명]}")

관련블로그 : https://devfunny.tistory.com/476


설정

build.gradle

implementation 'org.springframework.boot:spring-boot-starter-batch'
testImplementation 'org.springframework.batch:spring-batch-test'

application.yml

spring:
  batch:
    job:
      names: ${job.name:NONE}
    jdbc:
      initialize-schema: always  ## batch 메타 테이블을 생성할려면 always -> never
 # main:
   # web-application-type: none  ## 배치가끝나고 어플리케이션도 끝남.

MainApplication.java

@EnableBatchProcessing 추가

@EnableBatchProcessing
@SpringBootApplication
public class MainApplication {

    public static void main(String[] args) {
        SpringApplication.run(MainApplication.class, args);
    }

}

기본코드 예시

@Slf4j
@RequiredArgsConstructor
@Configuration
public class SimpleJobConfiguration {
    private final JobBuilderFactory jobBuilderFactory; // 생성자 DI 받음
    private final StepBuilderFactory stepBuilderFactory; // 생성자 DI 받음

    @Bean
    public Job simpleJob() {	// Job
        return jobBuilderFactory.get("simpleJob")
                                .start(simpleStep1())
                                .build();
    }

    @Bean
    public Step simpleStep1() {	// Step -> tasklet으로 간단하게 실행
        return stepBuilderFactory.get("simpleStep1")
                                 .tasklet((contribution, chunkContext) -> {
                                     log.info(">>>>> This is Step1");
                                     return RepeatStatus.FINISHED;
                                 })
                                 .build();
    }
}

Batch를 시작하는 방법

1) Run Configuration - Program arguments 설정

--spring.batch.job.names=helloJob // application.yml에 지정된 job.name이다.

2) BatchController 설정
jobLauncher, JobParameter 기본 개념이 필요하다. 아래 예시정도면 돌아가는 것을 확인할 수 있을 것이다.

@RestController
@RequestMapping("/batch")
@RequiredArgsConstructor
public class BatchController {

    private final JobLauncher jobLauncher;
    private final Job notificationAlarmJob;

    @GetMapping("/job")
    public String startJob() throws Exception {
        System.out.println("Starting the batch job");
        System.out.println("job: " +notificationAlarmJob);

        Map<String, JobParameter> parameters = new HashMap<>();
        parameters.put("timestamp", new JobParameter(System.currentTimeMillis()));
        JobExecution jobExecution = jobLauncher.run(notificationAlarmJob, new JobParameters(parameters));
        return "Batch job "+ jobExecution.getStatus();

    }
}

Chunk Orientation, Batch Transaction 에 대해

스프링 배치는 Chunk 지향 처리입니다. Chunk 단위로 트랜잭션을 다룹니다. 트랜잭션처럼 롤백도 됩니다.

  • 읽어야 할 또는, 처리할 아이템이 굉장히 많을때 트랜잭션을 한번에
    이어가는 것 보다 일정 주기로 여러번 실행하는 것이 안정적이다.

안정적?

  • Fault Tolerant (장애 허용)
  • 실패 시 다양한 처리(ex.rollback)를 할 수 있게 해준다.

https://docs.spring.io/spring-batch/docs/current/reference/html/step.html#chunkOrientedProcessing

Page Size와 Chunk Size

Spring Batch에서 일반적으로 Reader로 PagingItemReader를 많이들 사용합니다.
일반적으로 Page Size와 Chunk Size 는 서로 비슷해보이지만 서로 다른 의미입니다.

  1. Chunk Size: 한번에 처리될 트랜잭션 단위
  2. Page Size: 한번에 조회할 Item의 양

ItemReader에서 Read가 이루어지는 경우에 doReadPage(); 코드가 실행됩니다.

Read는 Page Size 단위로 이루어지며 쿼리 실행시 Page의 Size를 지정하기 위한 용도입니다
Chunk는 Item이 처리되는 단위이며 이 때문에 Chunk Size와 Page Size가 다를 경우 불필요한 Read가 발생할 수 있습니다.

Step을 만들 때 지정하는 Chunk 갯수만큼의 인자를 받게 되는 것을 잊으면 안됩니다.

// 예시
Paging Reader의 page_size = 10, Chunk = 100 이라면
Reader가 10번 Page Read를 하고,
Processor 100번 처리할 때마다
Writer를 실행한다.

즉 Chunk Size와 Page Size를 일치 시키는게 보편적으로 좋은 방법입니다

JPA에서의 영속성 컨텍스트가 깨지는 문제도 있을 수 있다고 합니다
관련 블로그 : https://jojoldu.tistory.com/146


Curosr와 Paging에 따른 구현 방식

Cursor 기반 ItemReader 구현체

  • JdbcCursorItemReader
  • HibernateCursorItemReader
  • StoredProcedureItemReader

Paging 기반 ItemReader 구현체

  • JdbcPagingItemReader
  • HibernatePagingItemReader
  • JpaPagingItemReader

구현 예시

@Slf4j
@RequiredArgsConstructor
@Configuration
public class AllReadJobConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    private static final int CHUNK_SIZE = 4;
    private static final int FETCH_SIZE = 4;

    @Bean
    public Job allReadJob(
            Step allReadStep
    ) {
        return jobBuilderFactory.get("allReadJob")
                .start(allReadStep)
                .build();
    }

    @Bean
    public Step allReadStep(
            ItemReader<Schedule> allReadPagingReader,
            ItemWriter<Schedule> allReadWriter
    ) {
        return stepBuilderFactory.get("allReadStep")
                .<Schedule, Schedule>chunk(CHUNK_SIZE)
                .reader(allReadPagingReader)
                .writer(allReadWriter)
                .allowStartIfComplete(true)
                .build();
    }

	// Cursor 기반 ItemReader 구현체
    @Bean
    public JdbcCursorItemReader<Schedule> allReadReader() {
        return new JdbcCursorItemReaderBuilder<Schedule>()
                .verifyCursorPosition(false)
                .fetchSize(FETCH_SIZE)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Schedule.class))
                .sql("select * from schedules order by id")
                .name("jdbcCursorItemReader")
                .build();
    }
	// Paging 기반 ItemReader 구현체
    @Bean
    public JdbcPagingItemReader<Schedule> allReadPagingReader(
            PagingQueryProvider queryProvider) {
        return new JdbcPagingItemReaderBuilder<Schedule>()
                .pageSize(CHUNK_SIZE)
                .fetchSize(FETCH_SIZE)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Schedule.class))
                .queryProvider(queryProvider)
                .name("jdbcCursorItemReader")
                .build();
    }

    @Bean
    public PagingQueryProvider queryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);
        queryProvider.setSelectClause("*");
        queryProvider.setFromClause("from schedules");

        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);

        queryProvider.setSortKeys(sortKeys);
        return queryProvider.getObject();
    }

    @Bean
    public ItemWriter<Schedule> allReadWriter() {
        return list -> log.info("write items.\n" +
                list.stream()
                        .map(s -> s.toString())
                        .collect(Collectors.joining("\n")));
    }
}

테스트 작업

테스트할 JobConfig 클래스

@Configuration
@RequiredArgsConstructor
public class HelloJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean("hellJob")
    public Job helloJob(Step helloStep) {
        return jobBuilderFactory.get("helloJob")
                .incrementer(new RunIdIncrementer())
                .start(helloStep)
                .build();
    }

    @JobScope
    @Bean("helloStep")
    public Step helloStep(Tasklet tasklet) {
        return stepBuilderFactory.get("helloStep")
                .tasklet(tasklet)
                .build();
    }

    @StepScope
    @Bean
    public Tasklet tasklet() {
        return (contribution, chunkContext) -> {
            System.out.println("Hello Spring Batch");
            return RepeatStatus.FINISHED;
        };
    }
}

Test 작업시 필요한 cofing 파일

@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
public class BatchTestConfig {

}

HelloJobConfigTest(Test 파일)

@SpringBatchTest
@SpringBootTest
@ContextConfiguration(classes = {HelloJobConfig.class, BatchTestConfig.class})
public class HelloJobConfigTest {
    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Test
    public void success() throws Exception {
        // when
        JobExecution execution = jobLauncherTestUtils.launchJob();

        // then
        Assertions.assertEquals(execution.getExitStatus(), ExitStatus.COMPLETED);
    }
}


참고

profile
배운 것을 기록합니다.

0개의 댓글