SpringBatch에 알아야 하는 내용을 정리합니다.
batch
란?
요청이 들어올때 마다 실시간으로 데이터를 처리 하는것이 아닌 일괄적으로 모아서 하는 작업을 배치 (Batch) 작업이라고 합니다. 이러한 특성때문에 배치작업은 일반적으로, 정해진 시간에 대량의 데이터를 일괄적으로 처리한다
는 특징을 갖습니다.
절대 스케쥴러
라는 뜻은 포함하지 않습니다.
스케쥴러는 batch를 보완할 때 쓰입니다. 다시말해, 주기적으로 일괄처리할 때 batch에 주기성을 덧붙이는 것으로 보면 됩니다. ex. cron
여러 개의 작업
을 미리 정해진 순서에 따라 중단없이 처리하는 것스케쥴러(Scheduler)
특정한 시간에
등록한 작업을 자동으로 실행시키는 것
-> Spring Scheduler, Quarts 등
ex. 새벽 12시에 쿠폰 만료(expired)시간 체크하기
1) JobLauncher
Job을 실행하는 런쳐
2) Job
정해진 Step을 실행시킬 작업을 의미로,
Job configuration과 대응되는 단위입니다.
3) Step
: Job 내부에서 수행될 독립적으로 실행하고 순서가 지정될 수 있습니다. 1개의 Step 또는 N개의 Step들로 이루어질 수 있습니다.
Job
> 여러 Step
(2종류: Tasklet
, Chunk
)
4) 그외 알아두어야 할 용어
batch 메타 테이블
Spring batch가 제공하는 가장 기본적인 기능으로 배치 작업 하는동안 사용되는 모든 메타정보들 (작업 시간, 파라미터, 정상수행 여부 …)을 기록하여 작업 중에 사용하거나 모니터링 용도로 사용 할 수 있게 해줍니다.
이러한 이유로 Spring batch를 사용하기 위한 ‘첫번째 단계’는 다름이 아닌 메타 데이터 스키마을 구성하는 것 입니다.(application.yml - batch:jdbc:initialize-schema: always
)
Spring Batch에서는 DB를 통해 완료/실패 와 같은 상태관리를 합니다.
크게 4가지 상태를 DB에 저장합니다.
1
이전 실행 Job History
2
실패한 Batch와 Parameter / 성공한 Job
3
실행 재개 지점
4
Job 기준 Step현황과 성공/실패 여부
: 간단하게 구성할 때는 Tasklet, 하지만 스프링배치는 Chunk 지향
입니다.
Task
기반 : 하나의 작업 기반으로 실행
Chunk
기반 : 하나의 큰 덩어리를 n개씩 나눠서 이 단위로 실행.
즉, chunk 단위로 commit과 rollback이 이루어지는 것임.
Step의 종류는 아래 3가지입니다.(Chunk 기준)
ItemReader
ItemWriter
ItemProcessor
(필수x)SpringBatch의 Step(Tasklet, Chunk)들은 아래 그림대로 과정을 거칩니다.
ItemReader
: 단순하게 아이템 하나를 읽는 Strategy이다.
배치 어플리케이션은 데이터를 읽는 것으로 시작,
다양한 DataSource(꼭 db을 말하는 것은 아님!)로부터 데이터를 읽을 수 있는 수 있는 구현체를 잘 정의하는 게 중요하다.
ItemProcessor
: Reader로부터 받아온 Item을 가공하는 담당.
Item이 <I
>타입이었다면 <O
>타입으로 변경해서 넘길 수 있음.
배치 어플리케이션의 핵심 비즈니스 로직이 들어가게 됨.
하지만 따로 가공할 로직이 없다면 만들지 않아도됨(Optional)
프로세서는 개발자가 직접 구현하는 영역이기 때문에 제공되는 구현체의 종류도 Reader에 비해 별로 없음.
ItemWriter
: Reader, Processor로부터 받아온 Item에 대한 마지막 처리 단계(마지막이니 void
) - db에 저장하거나, 파일로 쓰거나, 이벤트를 발행하거나 등등...
JobLauncher
:JobParameter
: Spring Batch는 외부 혹은 내부에서 파라미터를 받아 여러 Batch 컴포넌트에서 사용할 수 있게 지원하고 있습니다.java.util.Map<String, JobParameter> 객체
의 래퍼(wrapper) 객체
입니다. 해당 파라미터는 변환도 가능합니다.
Job Parameter를 사용하기 위해선 항상 Spring Batch 전용 Scope를 선언해야 합니다.
크게 @StepScope
와 @JobScope
2가지가 있습니다
// SpEL로 선언해서 사용할 수 있습니다.
@Value("#{jobParameters[파라미터명]}")
관련블로그 : https://devfunny.tistory.com/476
build.gradle
implementation 'org.springframework.boot:spring-boot-starter-batch'
testImplementation 'org.springframework.batch:spring-batch-test'
application.yml
spring:
batch:
job:
names: ${job.name:NONE}
jdbc:
initialize-schema: always ## batch 메타 테이블을 생성할려면 always -> never
# main:
# web-application-type: none ## 배치가끝나고 어플리케이션도 끝남.
MainApplication.java
@EnableBatchProcessing
추가
@EnableBatchProcessing
@SpringBootApplication
public class MainApplication {
public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
}
}
@Slf4j
@RequiredArgsConstructor
@Configuration
public class SimpleJobConfiguration {
private final JobBuilderFactory jobBuilderFactory; // 생성자 DI 받음
private final StepBuilderFactory stepBuilderFactory; // 생성자 DI 받음
@Bean
public Job simpleJob() { // Job
return jobBuilderFactory.get("simpleJob")
.start(simpleStep1())
.build();
}
@Bean
public Step simpleStep1() { // Step -> tasklet으로 간단하게 실행
return stepBuilderFactory.get("simpleStep1")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> This is Step1");
return RepeatStatus.FINISHED;
})
.build();
}
}
1) Run Configuration - Program arguments
설정
--spring.batch.job.names=helloJob // application.yml에 지정된 job.name이다.
2) BatchController
설정
jobLauncher
, JobParameter
기본 개념이 필요하다. 아래 예시정도면 돌아가는 것을 확인할 수 있을 것이다.
@RestController
@RequestMapping("/batch")
@RequiredArgsConstructor
public class BatchController {
private final JobLauncher jobLauncher;
private final Job notificationAlarmJob;
@GetMapping("/job")
public String startJob() throws Exception {
System.out.println("Starting the batch job");
System.out.println("job: " +notificationAlarmJob);
Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("timestamp", new JobParameter(System.currentTimeMillis()));
JobExecution jobExecution = jobLauncher.run(notificationAlarmJob, new JobParameters(parameters));
return "Batch job "+ jobExecution.getStatus();
}
}
스프링 배치는 Chunk 지향 처리
입니다. Chunk 단위로 트랜잭션
을 다룹니다. 트랜잭션처럼 롤백도 됩니다.
일정 주기로 여러번 실행하는 것이 안정적
이다.안정적?
ex.rollback
)를 할 수 있게 해준다.https://docs.spring.io/spring-batch/docs/current/reference/html/step.html#chunkOrientedProcessing
Page Size와 Chunk Size
Spring Batch에서 일반적으로 Reader로 PagingItemReader를 많이들 사용합니다.
일반적으로 Page Size와 Chunk Size 는 서로 비슷해보이지만 서로 다른 의미입니다.
Chunk Size
: 한번에 처리될 트랜잭션 단위Page Size
: 한번에 조회할 Item의 양ItemReader에서 Read가 이루어지는 경우에 doReadPage();
코드가 실행됩니다.
Read는 Page Size 단위로 이루어지며 쿼리 실행시 Page의 Size를 지정하기 위한 용도입니다
Chunk는 Item이 처리되는 단위이며 이 때문에 Chunk Size와 Page Size가 다를 경우 불필요한 Read가 발생할 수 있습니다.
Step을 만들 때 지정하는 Chunk 갯수만큼의 인자를 받게 되는 것을 잊으면 안됩니다.
// 예시
Paging Reader의 page_size = 10, Chunk = 100 이라면
Reader가 10번 Page Read를 하고,
Processor 100번 처리할 때마다
Writer를 실행한다.
즉 Chunk Size와 Page Size를 일치 시키는게 보편적으로 좋은 방법입니다
JPA에서의 영속성 컨텍스트가 깨지는 문제도 있을 수 있다고 합니다
관련 블로그 : https://jojoldu.tistory.com/146
Cursor 기반 ItemReader 구현체
- JdbcCursorItemReader
- HibernateCursorItemReader
- StoredProcedureItemReader
Paging 기반 ItemReader 구현체
- JdbcPagingItemReader
- HibernatePagingItemReader
- JpaPagingItemReader
구현 예시
@Slf4j
@RequiredArgsConstructor
@Configuration
public class AllReadJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private static final int CHUNK_SIZE = 4;
private static final int FETCH_SIZE = 4;
@Bean
public Job allReadJob(
Step allReadStep
) {
return jobBuilderFactory.get("allReadJob")
.start(allReadStep)
.build();
}
@Bean
public Step allReadStep(
ItemReader<Schedule> allReadPagingReader,
ItemWriter<Schedule> allReadWriter
) {
return stepBuilderFactory.get("allReadStep")
.<Schedule, Schedule>chunk(CHUNK_SIZE)
.reader(allReadPagingReader)
.writer(allReadWriter)
.allowStartIfComplete(true)
.build();
}
// Cursor 기반 ItemReader 구현체
@Bean
public JdbcCursorItemReader<Schedule> allReadReader() {
return new JdbcCursorItemReaderBuilder<Schedule>()
.verifyCursorPosition(false)
.fetchSize(FETCH_SIZE)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Schedule.class))
.sql("select * from schedules order by id")
.name("jdbcCursorItemReader")
.build();
}
// Paging 기반 ItemReader 구현체
@Bean
public JdbcPagingItemReader<Schedule> allReadPagingReader(
PagingQueryProvider queryProvider) {
return new JdbcPagingItemReaderBuilder<Schedule>()
.pageSize(CHUNK_SIZE)
.fetchSize(FETCH_SIZE)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Schedule.class))
.queryProvider(queryProvider)
.name("jdbcCursorItemReader")
.build();
}
@Bean
public PagingQueryProvider queryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("*");
queryProvider.setFromClause("from schedules");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
@Bean
public ItemWriter<Schedule> allReadWriter() {
return list -> log.info("write items.\n" +
list.stream()
.map(s -> s.toString())
.collect(Collectors.joining("\n")));
}
}
테스트할 JobConfig 클래스
@Configuration
@RequiredArgsConstructor
public class HelloJobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean("hellJob")
public Job helloJob(Step helloStep) {
return jobBuilderFactory.get("helloJob")
.incrementer(new RunIdIncrementer())
.start(helloStep)
.build();
}
@JobScope
@Bean("helloStep")
public Step helloStep(Tasklet tasklet) {
return stepBuilderFactory.get("helloStep")
.tasklet(tasklet)
.build();
}
@StepScope
@Bean
public Tasklet tasklet() {
return (contribution, chunkContext) -> {
System.out.println("Hello Spring Batch");
return RepeatStatus.FINISHED;
};
}
}
Test 작업시 필요한 cofing 파일
@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
public class BatchTestConfig {
}
HelloJobConfigTest(Test 파일)
@SpringBatchTest
@SpringBootTest
@ContextConfiguration(classes = {HelloJobConfig.class, BatchTestConfig.class})
public class HelloJobConfigTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void success() throws Exception {
// when
JobExecution execution = jobLauncherTestUtils.launchJob();
// then
Assertions.assertEquals(execution.getExitStatus(), ExitStatus.COMPLETED);
}
}