스프링 배치는 Job 타입의 빈이 생성되면 JobLauncher 객체에 의하여 Job을 실행하고 Job은 Step을 실행한다.
JobRepository는 DB또는 메모리에 배치가 실행될 수 있도록 배치의 메타 데이터를 관리하는 클래스이다.
💡 implementation 'org.springframework.boot:spring-boot-starter-batch'
Step은 Job의 세부 실행 단위이며, N개가 등록돼 실행된다.
Step의 실행단위는 크게 2가지가 있다.
스프링 배치의 실행 단위, 독립적으로 실행할 수 있는 순서 지정된 스텝의 목록. Job의 실행은 JobLauncher에서 실행된다.
Job은 N개의 Step을 실행할 수 있으며, 흐름(Flow)을 관리할 수 있다.
CommandLineJobRunner
: 스크립트 or 명령행 이용JobRegistryBackgroundJobRunner
: 자바 프로세스 내에서 쿼츠나 JMX 후크 같은 스케줄러 이용JobLauncherCommandLineRunner
: Spring boot 서버가 올라갈 때 모든 Job 타입의 빈을 실행application.yml
에 아래 내용을 추가해준다.spring:
batch:
jdbc:
initialize-schema: always
JobLauncherCommandLineRunner
에 의하여 모든 Job이 실행된는 것을 막고 싶다면 application.yml
에 아래 내용 추가spring:
batch:
job:
names: ${job.name:NONE}
RunIdIncrementer는 항상 다른 run.id를 Parameter로 설정
@Bean
public Job helloJob(){
return jobBuilderFactory.get("helloJob")
.incrementer(new RunIdIncrementer())
.start(this.helloStep())
.build();
}
.incrementer
에 new RunIdIncrementer()
로 항상 새로운 id를 부여한다.
package com.example.batch.part3;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
* author : duckbill413
* date : 2023-01-21
* description :
* Tasklet을 사용한 Task 기반 배치 처리
* 100개의 문자열을 리스트에 입력하고 리스트의 사이즈를 출력
**/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class TaskProcessingConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job taskProcessingJob(){
return jobBuilderFactory.get("taskProcessingJob")
.incrementer(new RunIdIncrementer())
.start(this.taskBaseStep())
.build();
}
@Bean
public Step taskBaseStep(){
return stepBuilderFactory.get("taskBaseStep")
.tasklet(this.tasklet())
.build();
}
private Tasklet tasklet(){
return (contribution, chunkContext) -> {
List<String> items = getItems();
log.info("task item size: {}", items.size());
return RepeatStatus.FINISHED;
};
}
private List<String> getItems(){
List<String> items = new ArrayList<>();
for (int i = 0; i < 100; i++) {
items.add(i + " Hello");
}
return items;
}
}
ItemReader, ItemProcessor, ItemWriter의 관계 이해 필요
대량 처리를 하는 경우 Tasklet 보다 비교적 쉽게 구현
예를 들면 10,000개의 데이터 중 1,000개씩 10개의 덩어리로 수행
ItemReader에서 null을 return 할 때 까지 Step은 반복 된다.
ItemProcessor은 ItemReader에서 받아온 데이터를 처리하고 반환하면 step에서 List로 처리된 output 데이터를 ItemWriter에 전달한다.
<INPUT, OUTPUT> chunk(int)
: INPUT과 OUTPUT Type 정의, chunk의 size 정ItemWriter은 List을 받아 write 한다.
Chunk 기반 Batch 사용 예제
package com.example.batch.part3;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
* author : duckbill413
* date : 2023-01-21
* description :
* Chunk 기반 배치 처리 예제
* 100개의 데이터를 10개씩 10번에 걸쳐 실행될 수 있도록 실행
**/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class ChunkProcessingConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job chunkProcessingJob(){
return jobBuilderFactory.get("chunkProcessingJob")
.incrementer(new RunIdIncrementer())
.start(this.chunkBaseStep())
.build();
}
@Bean
public Step chunkBaseStep(){
return stepBuilderFactory.get("chunkBaseStep")
.<String, String> chunk(10)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
private ItemWriter<String> itemWriter() {
return items -> log.info("chunk item size : {}", items.size());
}
private ItemProcessor<String, String> itemProcessor() {
return item -> item + ", Spring Batch";
}
private ItemReader<String> itemReader() {
return new ListItemReader<>(getItems());
}
private List<String> getItems(){
List<String> items = new ArrayList<>();
for (int i = 0; i < 100; i++) {
items.add(i + " Hello");
}
return items;
}
}
Task를 Chunk 처리 나누어서 처리하기
RepeatStatus.CONTINUABLE
로 여러번 반복하여 실행하도록 Task를 구현한다. 종료 조건을 명확하게 지정해 주어야 한다.package com.example.batch.part3;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
* author : duckbill413
* date : 2023-01-21
* description :
* Tasklet을 사용한 Task 기반 배치 처리
* 100개의 문자열을 리스트에 입력하고 리스트의 사이즈를 출력
**/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class TaskSizeProcessingConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job taskSizeProcessingJob(){
return jobBuilderFactory.get("taskSizeProcessingJob")
.incrementer(new RunIdIncrementer())
.start(this.taskSizeBaseStep())
.build();
}
@Bean
public Step taskSizeBaseStep(){
return stepBuilderFactory.get("taskBaseStep")
.tasklet(this.tasklet())
.build();
}
private Tasklet tasklet(){
List<String> items = getItems();
return (contribution, chunkContext) -> {
StepExecution stepExecution = contribution.getStepExecution();
int chunkSize = 10;
int fromIndex = stepExecution.getReadCount();
int toIndex = fromIndex + chunkSize;
if (toIndex > items.size())
toIndex = items.size();
if (fromIndex >= items.size()){
return RepeatStatus.FINISHED;
}
List<String> subList = items.subList(fromIndex, toIndex);
log.info("task sub item size: {}", subList.size());
stepExecution.setReadCount(toIndex);
return RepeatStatus.CONTINUABLE;
};
}
private List<String> getItems(){
List<String> items = new ArrayList<>();
for (int i = 0; i < 99; i++) {
items.add(i + " Hello");
}
return items;
}
}
String parameter = jobParameters.getString(key, defaultValue);
@Value(”#{jobParameters[key]}”)
taskSizeProcessingConfiguration
의 tasklet을 수정하여 생성private Tasklet tasklet(){
List<String> items = getItems();
return (contribution, chunkContext) -> {
StepExecution stepExecution = contribution.getStepExecution();
JobParameters jobParameters = stepExecution.getJobParameters();
String value = jobParameters.getString("chunkSize", "10");
int chunkSize = StringUtils.isNotEmpty(value) ? Integer.parseInt(value) : 10;
int fromIndex = stepExecution.getReadCount();
int toIndex = fromIndex + chunkSize;
if (toIndex > items.size())
toIndex = items.size();
if (fromIndex >= items.size()){
return RepeatStatus.FINISHED;
}
List<String> subList = items.subList(fromIndex, toIndex);
log.info("task sub item size: {}", subList.size());
stepExecution.setReadCount(toIndex);
return RepeatStatus.CONTINUABLE;
};
}
JobParameter
의 chunkSize
를 불러와서 파라메터로 사용한다. Build시에 -chunkSize=20 --job.name=taskJobParameterProcessingJob
로 Program arguments를 수정해주어 실행시킨다.@Bean
@JobScope // @StepScope도 동일
public Step chunkJobParameterBaseStep(@Value("#{jobParameters[chunkSize]}") String chunkSize) {
return stepBuilderFactory.get("chunkJobParameterBaseStep")
.<String, String>chunk(StringUtils.isNotEmpty(chunkSize) ? Integer.parseInt(chunkSize) : 10)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@JobScope
와 @Value
어노테이션을 사용하여 파라메터를 불러온다. @Value("#{jobParameters[chunkSize]}") String chunkSize
@JobScope
는 job 실행 시점에 생성/소멸@StepScope
는 step 실행 시점에 생성/소멸@Value(”#{jobParameters[key]}”)
를 사용하기 위해 @JobScope
와 @StepScope
는 필수public class CustomItemReader<T> implements ItemReader<T> {
private final List<T> items;
public CustomItemReader(List<T> items){
this.items = items;
}
@Override
public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (!items.isEmpty())
return items.remove(0);
return null; // null을 리턴하면 junk 종료를 의미
}
}
id, 이름, 나이, 거주지
1, 이경원, 32, 인천
2, 홍길동, 30, 서울
3, 나주혁, 25, 강원
@Bean
public Step csvFileStep() throws Exception {
return stepBuilderFactory.get("csvFileStep")
.<Person, Person> chunk(10)
.reader(this.csvFileItemReader())
.writer(itemWriter())
.build();
}
private FlatFileItemReader<Person> csvFileItemReader() throws Exception {
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<Person>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "name", "age", "address");
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSet -> {
int id = fieldSet.readInt("id");
String name = fieldSet.readString("name");
String age = fieldSet.readString("age");
String address = fieldSet.readString("address");
return new Person(id, name, age, address);
});
FlatFileItemReader<Person> itemReader = new FlatFileItemReaderBuilder<Person>()
.name("csvFileItemReader")
.encoding("UTF-8")
.resource(new ClassPathResource("test.csv"))
.linesToSkip(1)
.lineMapper(lineMapper)
.build();
itemReader.afterPropertiesSet(); // 필드 설정값이 정상인지 검증하는 메서드
return itemReader;
}
private JdbcCursorItemReader<Person> jdbcCursorItemReader() throws Exception {
JdbcCursorItemReader jdbcCursorItemReader = new JdbcCursorItemReaderBuilder<Person>()
.name("jdbcCursorItemReader")
.dataSource(dataSource)
.sql("select id, name, age, address from person")
.rowMapper((rs, rowNum) -> new Person(rs.getInt(1),
rs.getString(2),
rs.getString(3),
rs.getString(4)))
.build();
jdbcCursorItemReader.afterPropertiesSet();
return jdbcCursorItemReader;
}
private JdbcPagingItemReader<Person> jdbcPagingItemReader() throws Exception {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("id", 1); // Id 1인 사람만 조회
JdbcPagingItemReader jdbcPagingItemReader = new JdbcPagingItemReaderBuilder<Person>()
.name("jdbcPagingItemReader")
.dataSource(dataSource)
.pageSize(1)
.fetchSize(1)
.rowMapper((rs, rowNum) -> new Person(
rs.getInt("id"),
rs.getString("name"),
rs.getString("age"),
rs.getString("address")
))
.queryProvider(this.createQueryProvider())
.parameterValues(parameterValues)
.build();
jdbcPagingItemReader.afterPropertiesSet();
return jdbcPagingItemReader;
}
private PagingQueryProvider createQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
queryProviderFactoryBean.setDataSource(dataSource);
queryProviderFactoryBean.setSelectClause("select id, name, age, address");
queryProviderFactoryBean.setFromClause("from person");
queryProviderFactoryBean.setWhereClause("where id = :id"); // Where 절
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.DESCENDING);
queryProviderFactoryBean.setSortKeys(sortKeys); // Order 절
return queryProviderFactoryBean.getObject();
}
PagingQeuryProvider
의 setSelectClause
, setFromClause
등을 이용하여 조회를 수행한다. 이때, sql과 같이 where, order 등도 사용 가능하다.FlatFileItemWriter
는 데이터가 매핑된 객체를 파일로 writeprivate ItemWriter<Person> csvFileItemWriter() throws Exception {
BeanWrapperFieldExtractor fieldExtractor = new BeanWrapperFieldExtractor<Person>();
fieldExtractor.setNames(new String[]{"id", "name", "age", "address"});
DelimitedLineAggregator<Person> personDelimitedLineAggregator = new DelimitedLineAggregator<>(); // 각 필드의 값을 하나의 라인에 작성하기 위하여 구분값 필요
personDelimitedLineAggregator.setDelimiter(", ");
personDelimitedLineAggregator.setFieldExtractor(fieldExtractor); // mapping 설정 종료
FlatFileItemWriter<Person> csvFileItemWriter = new FlatFileItemWriterBuilder<Person>()
.name("csvFileItemWriter")
.encoding("UTF-8")
.resource(new FileSystemResource("output/test-output.csv"))
.lineAggregator(personDelimitedLineAggregator)
.headerCallback(writer -> writer.write("id, 이름, 나이, 주소")) // header
.footerCallback(writer -> writer.write("------------------\n")) // footer
.append(true) // 데이터 추가시 파일이 생성되는 것이 아닌 추가되도록 함
.build();
csvFileItemWriter.afterPropertiesSet();
return csvFileItemWriter;
}
headerCallback
과 footerCallback
을 사용하여 데이터 추가의 시작과 끝에 정보를 추가할 수 있다.append
를 true로 하면 데이터 추가시 파일이 이미 존재한다면 새로 파일을 생성하는 것이 아닌 데이터만 뒤에 추가하게 된다.JdbcBatchItemWriter
는 jdbc를 사용에 DB에 writeJdbcBatchItemWriter
는 bulk insert/update/delete 처리jdbc:mysql://localhost/{table-name}?characterEncoding=UTF-8&serverTimezone=UTC&rewriteBatchedStatements=true
private ItemWriter<Person> jdbcBatchItemWriter() {
JdbcBatchItemWriter<Person> personJdbcBatchItemWriter = new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource)
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("insert into person(name, age, address) values(:name, :age, :address)")
.build();
personJdbcBatchItemWriter.afterPropertiesSet();
return personJdbcBatchItemWriter;
}
JdbcBatchItemWriter
은 Jdbc ItemReader와 같이 DataSource
를 이용한다.JdbcBatchItemWriter
는 JDBC를 활용해 Item을 쉽게 Write 할 수 있게 해준다.new BeanPropertyItemSqlParameterSourceProvider<>()
를 이용하여 쉽게 매핑 가능하다.JpaItemWriter
는 JPA Entity 기반으로 데이터를 DB에 writeprivate ItemWriter<Person> jpaItemWriter() throws Exception {
JpaItemWriter<Person> personJpaItemWriter = new JpaItemWriterBuilder<Person>()
.entityManagerFactory(entityManagerFactory)
.build();
personJpaItemWriter.afterPropertiesSet();
return personJpaItemWriter;
}
EntityManagerFactory
를 활용한다.Hibernate: select person0_.id as id1_0_0_, person0_.address as address2_0_0_, person0_.age as age3_0_0_, person0_.name as name4_0_0_ from person person0_ where person0_.id=?
Hibernate: insert into person (address, age, name) values (?, ?, ?)
별다른 설정이 없는 경우 EntityManager
에 Merge 메서드로 실행되기 때문에 select문도 같이 실행되게 된다.
JPA 특성상 Entity가 수정 대상인지 확인하기 위해서 select문을 먼저 실행해 보는 것이다.
new JpaItemWriterBuilder<Person>()
.entityManagerFactory(entityManagerFactory)
.usePersist(true)
.build();
Builder에서 usePersist
를 true
로 놓으면 select문을 실행하지 않는다.
Hibernate: insert into person (address, age, name) values (?, ?, ?)
만일, Entity의 Id 값을 직접 할당하지 않는다면 usePersist
를 사용하지 않더라로 select문은 실행되지 않는다.
- 실습 예제
- 예를 들어 person.id가 짝수인 person 만 return 하는 경우
- ItemWriter는 5개의 person만 받아 처리
- ItemProcessor 실습
private ItemProcessor<? super Person, ? extends Person> itemProcessor() { return item -> { if (item.getId() % 2 == 0) return item; else return null; }; }
private FlatFileItemReader<Person> itemReader() throws Exception {
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames("name", "age", "address");
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSet -> new Person(
fieldSet.readString(0),
fieldSet.readString(1),
fieldSet.readString(2)));
FlatFileItemReader<Person> itemReader = new FlatFileItemReaderBuilder<Person>()
.name("savePersonItemReader")
.encoding("UTF-8")
.linesToSkip(1)
.resource(new FileSystemResource("output/test-input.csv"))
.lineMapper(lineMapper)
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
allow_duplicate == true
인 경우 모든 person을 return 한다.allow_duplicate == false
또는 null 인 경우 person.name이 중복된 데이터는 null로 return 한다.java.util.Map
을 사용하여 쉽게 구현 가능하다.public class DuplicateValidationProcessor<T> implements ItemProcessor<T, T> {
private final Map<String, Object> keyPool = new ConcurrentHashMap<>(); // Multi-thread 환경 지원
private final Function<T, String> keyExtractor;
private final boolean allowDuplicate;
public DuplicateValidationProcessor(Function<T, String> keyExtractor, boolean allowDuplicate) {
this.keyExtractor = keyExtractor;
this.allowDuplicate = allowDuplicate;
}
@Override
public T process(T item) throws Exception {
if (allowDuplicate)
return item;
String key = keyExtractor.apply(item);
if (keyPool.containsKey(key))
return null;
keyPool.put(key, key);
return item ;
}
}
ConcurrentHashMap
은 HashMap과 다르게 멀티 쓰레드 환경을 지원한다.Function
은 데이터 T를 입력받아 key를 생성하여 리턴한다.CompositeItemWriter
를 사용하여 여러개의 ItemWriter 등록이 가능하다.public CompositeItemWriter<Person> compositeItemWriter() throws Exception {
CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriterBuilder<Person>()
.delegates(jpaItemWriter(), itemLogWriter())
.build();
compositeItemWriter.afterPropertiesSet();
return compositeItemWriter;
}
private ItemWriter<? super Person> jpaItemWriter() throws Exception {
JpaItemWriter<Person> personJpaItemWriter = new JpaItemWriterBuilder<Person>()
.entityManagerFactory(entityManagerFactory)
.build();
personJpaItemWriter.afterPropertiesSet();
return personJpaItemWriter;
}
private ItemWriter<? super Person> itemLogWriter() {
return items -> log.info("person size: {}", items.size());
}
CompositeItemWriter
을 활용하여 두개 이상의 ItemWriter
을 등록할 수 있다. delegates
에 등록된 ItemWriter
은 순서대로 실행되므로 주의하자@EnableBatchProcessing
등록package com.example.batch;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
public class TestConfiguration {
}
package com.example.batch.part3.FileItemProcessor;
import com.example.batch.TestConfiguration;
import com.example.batch.part3.PersonRepository;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.batch.test.context.SpringBatchTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBatchTest
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {SavePersonConfiguration.class, TestConfiguration.class})
public class SavePersonConfigurationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private PersonRepository personRepository;
@After
public void tearDown() throws Exception {
personRepository.deleteAll();
}
@Test
public void test_step() {
JobExecution jobExecution = jobLauncherTestUtils.launchStep("savePersonStep");
Assertions.assertThat(jobExecution.getStepExecutions().stream()
.mapToInt(StepExecution::getWriteCount)
.sum())
.isEqualTo(personRepository.count())
.isEqualTo(3);
}
@Test
public void test_allow_duplicate() throws Exception {
// given
JobParameters jobParameters = new JobParametersBuilder()
.addString("allow_duplicate", "false")
.toJobParameters();
// when
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
// then
Assertions.assertThat(jobExecution.getStepExecutions().stream()
.mapToInt(StepExecution::getWriteCount)
.sum())
.isEqualTo(personRepository.count())
.isEqualTo(3);
}
@Test
public void test_not_allow_duplicate() throws Exception {
// given
JobParameters jobParameters = new JobParametersBuilder()
.addString("allow_duplicate", "true")
.toJobParameters();
// when
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
// then
Assertions.assertThat(jobExecution.getStepExecutions().stream()
.mapToInt(StepExecution::getWriteCount)
.sum())
.isEqualTo(personRepository.count())
.isEqualTo(30);
}
}
@SpringBatchTest
를 등록해야 JobLauncherTestUtils
를 등록해 사용할 수 있다.package com.example.batch.part3.FileItemProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterJob;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeJob;
import org.springframework.batch.core.annotation.BeforeStep;
/**
* author : duckbill413
* date : 2023-02-05
* description :
**/
@Slf4j
public class SavePersonListener {
public static class SavePersonStepExecutionListener {
@BeforeStep
public void beforeStep(StepExecution stepExecution){
log.info("beforeStep");
}
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution){
log.info("afterStep : {}", stepExecution.getWriteCount());
if (stepExecution.getWriteCount() == 0)
return ExitStatus.FAILED;
return stepExecution.getExitStatus();
}
}
public static class SavePersonJobExecutionListener implements JobExecutionListener{
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("beforeJob");
}
@Override
public void afterJob(JobExecution jobExecution) {
int sum = jobExecution.getStepExecutions().stream().mapToInt(StepExecution::getWriteCount).sum();
log.info("afterJob : {}", sum);
}
}
public static class SavePersonAnnotationJobExecutionListener {
@BeforeJob
public void beforeJob(JobExecution jobExecution) {
log.info("annotation beforeJob");
}
@AfterJob
public void afterJob(JobExecution jobExecution) {
int sum = jobExecution.getStepExecutions().stream().mapToInt(StepExecution::getWriteCount).sum();
log.info("annotation afterJob : {}", sum);
}
}
}
@Bean
public Job savePersonJob() throws Exception {
return this.jobBuilderFactory.get("savePersonJob")
.incrementer(new RunIdIncrementer())
.start(this.savePersonStep(null))
.listener(new SavePersonListener.SavePersonJobExecutionListener())
.listener(new SavePersonListener.SavePersonAnnotationJobExecutionListener())
.build();
}
@Bean
@JobScope
public Step savePersonStep(@Value("#{jobParameters[allow_duplicate]}") Boolean allowDuplicate) throws Exception {
return this.stepBuilderFactory.get("savePersonStep")
.<Person, Person>chunk(10)
.reader(this.itemReader())
.processor(new DuplicateValidationProcessor<>(Person::getName, allowDuplicate))
.writer(this.compositeItemWriter())
.listener(new SavePersonListener.SavePersonStepExecutionListener())
.build();
}
- 추가 요구 사항 예제
- Person.name이 empty String 인 경우 NotFoundNameException 발생
- NotFoundNameException이 3번 이상 발생하는 경우 Step 실패 처리
- SkipListener가 실행 되는 조건
- 에러 발생 횟수가 skipLimit 이하 인 경우
- skipLimit(2), throw Exception이 3번 발생하면 실행되지 않는다.
- skipLimit(3), throw Exception이 3번 발생하면 실행된다.
- skip 설정 조건에 해당하는 경우에만 실행
- SkipListner는 항상 faultTolerant() 메소드 후 선언
- 예제 코드
@Bean @JobScope public Step savePersonStep(@Value("#{jobParameters[allow_duplicate]}") Boolean allowDuplicate) throws Exception { return this.stepBuilderFactory.get("savePersonStep") .<Person, Person>chunk(10) .reader(this.itemReader()) .processor(itemProcessor(allowDuplicate)) .writer(this.compositeItemWriter()) .listener(new SavePersonListener.SavePersonStepExecutionListener()) .faultTolerant() // Skip과 같은 예외처리 메소드 지원 .skip(NotFoundNameException.class) .skipLimit(2) .build(); } private ItemProcessor<? super Person, ? extends Person> itemProcessor(Boolean allowDuplicate) throws Exception { DuplicateValidationProcessor<Person> personDuplicateValidationProcessor = new DuplicateValidationProcessor<>(Person::getName, allowDuplicate); ItemProcessor<Person, Person> validationProcessor = item -> { if (item.isNotEmptyName()) return item; throw new NotFoundNameException(); }; CompositeItemProcessor<Person, Person> compositeItemProcessor = new CompositeItemProcessorBuilder() .delegates(validationProcessor, personDuplicateValidationProcessor) .build(); compositeItemProcessor.afterPropertiesSet(); return compositeItemProcessor; }
- `stepBuilderFactory`에 skip 예외 처리 등록을 도와주는 `faultTolerant()`를 등록한다. `faultTolerant()` 뒤에 skip 메서드를 등록해야 한다. - `ItemProcessor`에 Name이 Empty인경우 에러를 Throw하는 ItemProcessor을 추가한다. 여러개의 ItemProcessor 등록을 위하여 ItemWriter와 같이 `CompositeItemProcessor`를 이용한다.
retry는 재시도 하였을 경우 성공할 여지가 있는 경우에 사용되게 된다.
추가 요구사항 예제
- NotFoundNameException이 발생하면, 3번 재시도 후Person.name
을“UNKNOWN”
으로 변경
- Retry 설정한 ItemProcessor 생성public class PersonValidationRetryProcessor implements ItemProcessor<Person, Person> { private final RetryTemplate retryTemplate; public PersonValidationRetryProcessor() { this.retryTemplate = new RetryTemplateBuilder() .maxAttempts(3) // RetryLimit 과 유사 .retryOn(NotFoundNameException.class) .build(); } @Override public Person process(Person item) throws Exception { return this.retryTemplate.execute(context -> { // RetryCallback if (item.isNotEmptyName()) return item; throw new NotFoundNameException(); }, context -> { // RecoveryCallback return item.unknownName(); }); } }
RetryTemplate
를 이용하여 retry 설정을 완료해 준다.
CompositeItemProcessor
에 ItemProcessor 추가 등CompositeItemProcessor<Person, Person> compositeItemProcessor = new CompositeItemProcessorBuilder() .delegates(new PersonValidationRetryProcessor(), validationProcessor, personDuplicateValidationProcessor) .build();
RetryListener
구현 코드
public static class SavePersonRetryListener implements RetryListener{
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true; // retry 를 시작하는 설정 true일때 시작
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.info("close"); // retry 종료 후 호출
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.info("onError"); // retry 중 에러 발생 시 호출
}
}
이후 ItemProcessor
의 retryTemplate
에 withListener(new SavePersonRetryListener)
로 등록하여 사용
Spring Batch 실습 예제
cs_diary/spring/spring-batch at main · duckbill413/cs_diary