

스프링 배치는 Job 타입의 빈이 생성되면 JobLauncher 객체에 의하여 Job을 실행하고 Job은 Step을 실행한다.
JobRepository는 DB또는 메모리에 배치가 실행될 수 있도록 배치의 메타 데이터를 관리하는 클래스이다.
💡 implementation 'org.springframework.boot:spring-boot-starter-batch'
Step은 Job의 세부 실행 단위이며, N개가 등록돼 실행된다.
Step의 실행단위는 크게 2가지가 있다.
스프링 배치의 실행 단위, 독립적으로 실행할 수 있는 순서 지정된 스텝의 목록. Job의 실행은 JobLauncher에서 실행된다.
Job은 N개의 Step을 실행할 수 있으며, 흐름(Flow)을 관리할 수 있다.
CommandLineJobRunner: 스크립트 or 명령행 이용JobRegistryBackgroundJobRunner: 자바 프로세스 내에서 쿼츠나 JMX 후크 같은 스케줄러 이용JobLauncherCommandLineRunner: Spring boot 서버가 올라갈 때 모든 Job 타입의 빈을 실행application.yml에 아래 내용을 추가해준다.spring:
batch:
jdbc:
initialize-schema: always 

JobLauncherCommandLineRunner에 의하여 모든 Job이 실행된는 것을 막고 싶다면 application.yml에 아래 내용 추가spring:
batch:
job:
names: ${job.name:NONE}
RunIdIncrementer는 항상 다른 run.id를 Parameter로 설정
@Bean
public Job helloJob(){
return jobBuilderFactory.get("helloJob")
.incrementer(new RunIdIncrementer())
.start(this.helloStep())
.build();
}
.incrementer 에 new RunIdIncrementer()로 항상 새로운 id를 부여한다.
package com.example.batch.part3;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
* author : duckbill413
* date : 2023-01-21
* description :
* Tasklet을 사용한 Task 기반 배치 처리
* 100개의 문자열을 리스트에 입력하고 리스트의 사이즈를 출력
**/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class TaskProcessingConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job taskProcessingJob(){
return jobBuilderFactory.get("taskProcessingJob")
.incrementer(new RunIdIncrementer())
.start(this.taskBaseStep())
.build();
}
@Bean
public Step taskBaseStep(){
return stepBuilderFactory.get("taskBaseStep")
.tasklet(this.tasklet())
.build();
}
private Tasklet tasklet(){
return (contribution, chunkContext) -> {
List<String> items = getItems();
log.info("task item size: {}", items.size());
return RepeatStatus.FINISHED;
};
}
private List<String> getItems(){
List<String> items = new ArrayList<>();
for (int i = 0; i < 100; i++) {
items.add(i + " Hello");
}
return items;
}
}ItemReader, ItemProcessor, ItemWriter의 관계 이해 필요
대량 처리를 하는 경우 Tasklet 보다 비교적 쉽게 구현
예를 들면 10,000개의 데이터 중 1,000개씩 10개의 덩어리로 수행

ItemReader에서 null을 return 할 때 까지 Step은 반복 된다.
ItemProcessor은 ItemReader에서 받아온 데이터를 처리하고 반환하면 step에서 List로 처리된 output 데이터를 ItemWriter에 전달한다.
<INPUT, OUTPUT> chunk(int) : INPUT과 OUTPUT Type 정의, chunk의 size 정ItemWriter은 List을 받아 write 한다.
Chunk 기반 Batch 사용 예제
package com.example.batch.part3;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
* author : duckbill413
* date : 2023-01-21
* description :
* Chunk 기반 배치 처리 예제
* 100개의 데이터를 10개씩 10번에 걸쳐 실행될 수 있도록 실행
**/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class ChunkProcessingConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job chunkProcessingJob(){
return jobBuilderFactory.get("chunkProcessingJob")
.incrementer(new RunIdIncrementer())
.start(this.chunkBaseStep())
.build();
}
@Bean
public Step chunkBaseStep(){
return stepBuilderFactory.get("chunkBaseStep")
.<String, String> chunk(10)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
private ItemWriter<String> itemWriter() {
return items -> log.info("chunk item size : {}", items.size());
}
private ItemProcessor<String, String> itemProcessor() {
return item -> item + ", Spring Batch";
}
private ItemReader<String> itemReader() {
return new ListItemReader<>(getItems());
}
private List<String> getItems(){
List<String> items = new ArrayList<>();
for (int i = 0; i < 100; i++) {
items.add(i + " Hello");
}
return items;
}
}
Task를 Chunk 처리 나누어서 처리하기
RepeatStatus.CONTINUABLE 로 여러번 반복하여 실행하도록 Task를 구현한다. 종료 조건을 명확하게 지정해 주어야 한다.package com.example.batch.part3;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
* author : duckbill413
* date : 2023-01-21
* description :
* Tasklet을 사용한 Task 기반 배치 처리
* 100개의 문자열을 리스트에 입력하고 리스트의 사이즈를 출력
**/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class TaskSizeProcessingConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job taskSizeProcessingJob(){
return jobBuilderFactory.get("taskSizeProcessingJob")
.incrementer(new RunIdIncrementer())
.start(this.taskSizeBaseStep())
.build();
}
@Bean
public Step taskSizeBaseStep(){
return stepBuilderFactory.get("taskBaseStep")
.tasklet(this.tasklet())
.build();
}
private Tasklet tasklet(){
List<String> items = getItems();
return (contribution, chunkContext) -> {
StepExecution stepExecution = contribution.getStepExecution();
int chunkSize = 10;
int fromIndex = stepExecution.getReadCount();
int toIndex = fromIndex + chunkSize;
if (toIndex > items.size())
toIndex = items.size();
if (fromIndex >= items.size()){
return RepeatStatus.FINISHED;
}
List<String> subList = items.subList(fromIndex, toIndex);
log.info("task sub item size: {}", subList.size());
stepExecution.setReadCount(toIndex);
return RepeatStatus.CONTINUABLE;
};
}
private List<String> getItems(){
List<String> items = new ArrayList<>();
for (int i = 0; i < 99; i++) {
items.add(i + " Hello");
}
return items;
}
}String parameter = jobParameters.getString(key, defaultValue);@Value(”#{jobParameters[key]}”)taskSizeProcessingConfiguration의 tasklet을 수정하여 생성private Tasklet tasklet(){
List<String> items = getItems();
return (contribution, chunkContext) -> {
StepExecution stepExecution = contribution.getStepExecution();
JobParameters jobParameters = stepExecution.getJobParameters();
String value = jobParameters.getString("chunkSize", "10");
int chunkSize = StringUtils.isNotEmpty(value) ? Integer.parseInt(value) : 10;
int fromIndex = stepExecution.getReadCount();
int toIndex = fromIndex + chunkSize;
if (toIndex > items.size())
toIndex = items.size();
if (fromIndex >= items.size()){
return RepeatStatus.FINISHED;
}
List<String> subList = items.subList(fromIndex, toIndex);
log.info("task sub item size: {}", subList.size());
stepExecution.setReadCount(toIndex);
return RepeatStatus.CONTINUABLE;
};
} JobParameter의 chunkSize를 불러와서 파라메터로 사용한다. Build시에 -chunkSize=20 --job.name=taskJobParameterProcessingJob 로 Program arguments를 수정해주어 실행시킨다.
@Bean
@JobScope // @StepScope도 동일
public Step chunkJobParameterBaseStep(@Value("#{jobParameters[chunkSize]}") String chunkSize) {
return stepBuilderFactory.get("chunkJobParameterBaseStep")
.<String, String>chunk(StringUtils.isNotEmpty(chunkSize) ? Integer.parseInt(chunkSize) : 10)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
} @JobScope와 @Value 어노테이션을 사용하여 파라메터를 불러온다. @Value("#{jobParameters[chunkSize]}") String chunkSize@JobScope는 job 실행 시점에 생성/소멸@StepScope는 step 실행 시점에 생성/소멸@Value(”#{jobParameters[key]}”)를 사용하기 위해 @JobScope와 @StepScope는 필수
public class CustomItemReader<T> implements ItemReader<T> {
private final List<T> items;
public CustomItemReader(List<T> items){
this.items = items;
}
@Override
public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (!items.isEmpty())
return items.remove(0);
return null; // null을 리턴하면 junk 종료를 의미
}
}id, 이름, 나이, 거주지
1, 이경원, 32, 인천
2, 홍길동, 30, 서울
3, 나주혁, 25, 강원@Bean
public Step csvFileStep() throws Exception {
return stepBuilderFactory.get("csvFileStep")
.<Person, Person> chunk(10)
.reader(this.csvFileItemReader())
.writer(itemWriter())
.build();
}
private FlatFileItemReader<Person> csvFileItemReader() throws Exception {
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<Person>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "name", "age", "address");
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSet -> {
int id = fieldSet.readInt("id");
String name = fieldSet.readString("name");
String age = fieldSet.readString("age");
String address = fieldSet.readString("address");
return new Person(id, name, age, address);
});
FlatFileItemReader<Person> itemReader = new FlatFileItemReaderBuilder<Person>()
.name("csvFileItemReader")
.encoding("UTF-8")
.resource(new ClassPathResource("test.csv"))
.linesToSkip(1)
.lineMapper(lineMapper)
.build();
itemReader.afterPropertiesSet(); // 필드 설정값이 정상인지 검증하는 메서드
return itemReader;
}private JdbcCursorItemReader<Person> jdbcCursorItemReader() throws Exception {
JdbcCursorItemReader jdbcCursorItemReader = new JdbcCursorItemReaderBuilder<Person>()
.name("jdbcCursorItemReader")
.dataSource(dataSource)
.sql("select id, name, age, address from person")
.rowMapper((rs, rowNum) -> new Person(rs.getInt(1),
rs.getString(2),
rs.getString(3),
rs.getString(4)))
.build();
jdbcCursorItemReader.afterPropertiesSet();
return jdbcCursorItemReader;
}private JdbcPagingItemReader<Person> jdbcPagingItemReader() throws Exception {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("id", 1); // Id 1인 사람만 조회
JdbcPagingItemReader jdbcPagingItemReader = new JdbcPagingItemReaderBuilder<Person>()
.name("jdbcPagingItemReader")
.dataSource(dataSource)
.pageSize(1)
.fetchSize(1)
.rowMapper((rs, rowNum) -> new Person(
rs.getInt("id"),
rs.getString("name"),
rs.getString("age"),
rs.getString("address")
))
.queryProvider(this.createQueryProvider())
.parameterValues(parameterValues)
.build();
jdbcPagingItemReader.afterPropertiesSet();
return jdbcPagingItemReader;
}
private PagingQueryProvider createQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
queryProviderFactoryBean.setDataSource(dataSource);
queryProviderFactoryBean.setSelectClause("select id, name, age, address");
queryProviderFactoryBean.setFromClause("from person");
queryProviderFactoryBean.setWhereClause("where id = :id"); // Where 절
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.DESCENDING);
queryProviderFactoryBean.setSortKeys(sortKeys); // Order 절
return queryProviderFactoryBean.getObject();
}PagingQeuryProvider의 setSelectClause, setFromClause 등을 이용하여 조회를 수행한다. 이때, sql과 같이 where, order 등도 사용 가능하다.


FlatFileItemWriter는 데이터가 매핑된 객체를 파일로 writeprivate ItemWriter<Person> csvFileItemWriter() throws Exception {
BeanWrapperFieldExtractor fieldExtractor = new BeanWrapperFieldExtractor<Person>();
fieldExtractor.setNames(new String[]{"id", "name", "age", "address"});
DelimitedLineAggregator<Person> personDelimitedLineAggregator = new DelimitedLineAggregator<>(); // 각 필드의 값을 하나의 라인에 작성하기 위하여 구분값 필요
personDelimitedLineAggregator.setDelimiter(", ");
personDelimitedLineAggregator.setFieldExtractor(fieldExtractor); // mapping 설정 종료
FlatFileItemWriter<Person> csvFileItemWriter = new FlatFileItemWriterBuilder<Person>()
.name("csvFileItemWriter")
.encoding("UTF-8")
.resource(new FileSystemResource("output/test-output.csv"))
.lineAggregator(personDelimitedLineAggregator)
.headerCallback(writer -> writer.write("id, 이름, 나이, 주소")) // header
.footerCallback(writer -> writer.write("------------------\n")) // footer
.append(true) // 데이터 추가시 파일이 생성되는 것이 아닌 추가되도록 함
.build();
csvFileItemWriter.afterPropertiesSet();
return csvFileItemWriter;
}headerCallback과 footerCallback을 사용하여 데이터 추가의 시작과 끝에 정보를 추가할 수 있다.append를 true로 하면 데이터 추가시 파일이 이미 존재한다면 새로 파일을 생성하는 것이 아닌 데이터만 뒤에 추가하게 된다.JdbcBatchItemWriter는 jdbc를 사용에 DB에 writeJdbcBatchItemWriter는 bulk insert/update/delete 처리jdbc:mysql://localhost/{table-name}?characterEncoding=UTF-8&serverTimezone=UTC&rewriteBatchedStatements=trueprivate ItemWriter<Person> jdbcBatchItemWriter() {
JdbcBatchItemWriter<Person> personJdbcBatchItemWriter = new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource)
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("insert into person(name, age, address) values(:name, :age, :address)")
.build();
personJdbcBatchItemWriter.afterPropertiesSet();
return personJdbcBatchItemWriter;
}JdbcBatchItemWriter은 Jdbc ItemReader와 같이 DataSource를 이용한다.JdbcBatchItemWriter는 JDBC를 활용해 Item을 쉽게 Write 할 수 있게 해준다.new BeanPropertyItemSqlParameterSourceProvider<>()를 이용하여 쉽게 매핑 가능하다.JpaItemWriter는 JPA Entity 기반으로 데이터를 DB에 writeprivate ItemWriter<Person> jpaItemWriter() throws Exception {
JpaItemWriter<Person> personJpaItemWriter = new JpaItemWriterBuilder<Person>()
.entityManagerFactory(entityManagerFactory)
.build();
personJpaItemWriter.afterPropertiesSet();
return personJpaItemWriter;
}EntityManagerFactory를 활용한다.Hibernate: select person0_.id as id1_0_0_, person0_.address as address2_0_0_, person0_.age as age3_0_0_, person0_.name as name4_0_0_ from person person0_ where person0_.id=?
Hibernate: insert into person (address, age, name) values (?, ?, ?)별다른 설정이 없는 경우 EntityManager에 Merge 메서드로 실행되기 때문에 select문도 같이 실행되게 된다.
JPA 특성상 Entity가 수정 대상인지 확인하기 위해서 select문을 먼저 실행해 보는 것이다.
new JpaItemWriterBuilder<Person>()
.entityManagerFactory(entityManagerFactory)
.usePersist(true)
.build();
Builder에서 usePersist를 true로 놓으면 select문을 실행하지 않는다.
Hibernate: insert into person (address, age, name) values (?, ?, ?)만일, Entity의 Id 값을 직접 할당하지 않는다면 usePersist를 사용하지 않더라로 select문은 실행되지 않는다.

- 실습 예제
- 예를 들어 person.id가 짝수인 person 만 return 하는 경우
- ItemWriter는 5개의 person만 받아 처리
![]()
- ItemProcessor 실습
private ItemProcessor<? super Person, ? extends Person> itemProcessor() { return item -> { if (item.getId() % 2 == 0) return item; else return null; }; }
private FlatFileItemReader<Person> itemReader() throws Exception {
DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames("name", "age", "address");
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSet -> new Person(
fieldSet.readString(0),
fieldSet.readString(1),
fieldSet.readString(2)));
FlatFileItemReader<Person> itemReader = new FlatFileItemReaderBuilder<Person>()
.name("savePersonItemReader")
.encoding("UTF-8")
.linesToSkip(1)
.resource(new FileSystemResource("output/test-input.csv"))
.lineMapper(lineMapper)
.build();
itemReader.afterPropertiesSet();
return itemReader;
}allow_duplicate == true 인 경우 모든 person을 return 한다.allow_duplicate == false또는 null 인 경우 person.name이 중복된 데이터는 null로 return 한다.java.util.Map을 사용하여 쉽게 구현 가능하다.public class DuplicateValidationProcessor<T> implements ItemProcessor<T, T> {
private final Map<String, Object> keyPool = new ConcurrentHashMap<>(); // Multi-thread 환경 지원
private final Function<T, String> keyExtractor;
private final boolean allowDuplicate;
public DuplicateValidationProcessor(Function<T, String> keyExtractor, boolean allowDuplicate) {
this.keyExtractor = keyExtractor;
this.allowDuplicate = allowDuplicate;
}
@Override
public T process(T item) throws Exception {
if (allowDuplicate)
return item;
String key = keyExtractor.apply(item);
if (keyPool.containsKey(key))
return null;
keyPool.put(key, key);
return item ;
}
}ConcurrentHashMap은 HashMap과 다르게 멀티 쓰레드 환경을 지원한다.Function은 데이터 T를 입력받아 key를 생성하여 리턴한다.CompositeItemWriter 를 사용하여 여러개의 ItemWriter 등록이 가능하다.public CompositeItemWriter<Person> compositeItemWriter() throws Exception {
CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriterBuilder<Person>()
.delegates(jpaItemWriter(), itemLogWriter())
.build();
compositeItemWriter.afterPropertiesSet();
return compositeItemWriter;
}
private ItemWriter<? super Person> jpaItemWriter() throws Exception {
JpaItemWriter<Person> personJpaItemWriter = new JpaItemWriterBuilder<Person>()
.entityManagerFactory(entityManagerFactory)
.build();
personJpaItemWriter.afterPropertiesSet();
return personJpaItemWriter;
}
private ItemWriter<? super Person> itemLogWriter() {
return items -> log.info("person size: {}", items.size());
}CompositeItemWriter을 활용하여 두개 이상의 ItemWriter을 등록할 수 있다. delegates에 등록된 ItemWriter은 순서대로 실행되므로 주의하자
@EnableBatchProcessing 등록
package com.example.batch;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
public class TestConfiguration {
}package com.example.batch.part3.FileItemProcessor;
import com.example.batch.TestConfiguration;
import com.example.batch.part3.PersonRepository;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.batch.test.context.SpringBatchTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBatchTest
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {SavePersonConfiguration.class, TestConfiguration.class})
public class SavePersonConfigurationTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private PersonRepository personRepository;
@After
public void tearDown() throws Exception {
personRepository.deleteAll();
}
@Test
public void test_step() {
JobExecution jobExecution = jobLauncherTestUtils.launchStep("savePersonStep");
Assertions.assertThat(jobExecution.getStepExecutions().stream()
.mapToInt(StepExecution::getWriteCount)
.sum())
.isEqualTo(personRepository.count())
.isEqualTo(3);
}
@Test
public void test_allow_duplicate() throws Exception {
// given
JobParameters jobParameters = new JobParametersBuilder()
.addString("allow_duplicate", "false")
.toJobParameters();
// when
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
// then
Assertions.assertThat(jobExecution.getStepExecutions().stream()
.mapToInt(StepExecution::getWriteCount)
.sum())
.isEqualTo(personRepository.count())
.isEqualTo(3);
}
@Test
public void test_not_allow_duplicate() throws Exception {
// given
JobParameters jobParameters = new JobParametersBuilder()
.addString("allow_duplicate", "true")
.toJobParameters();
// when
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);
// then
Assertions.assertThat(jobExecution.getStepExecutions().stream()
.mapToInt(StepExecution::getWriteCount)
.sum())
.isEqualTo(personRepository.count())
.isEqualTo(30);
}
}@SpringBatchTest 를 등록해야 JobLauncherTestUtils를 등록해 사용할 수 있다.
package com.example.batch.part3.FileItemProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterJob;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeJob;
import org.springframework.batch.core.annotation.BeforeStep;
/**
* author : duckbill413
* date : 2023-02-05
* description :
**/
@Slf4j
public class SavePersonListener {
public static class SavePersonStepExecutionListener {
@BeforeStep
public void beforeStep(StepExecution stepExecution){
log.info("beforeStep");
}
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution){
log.info("afterStep : {}", stepExecution.getWriteCount());
if (stepExecution.getWriteCount() == 0)
return ExitStatus.FAILED;
return stepExecution.getExitStatus();
}
}
public static class SavePersonJobExecutionListener implements JobExecutionListener{
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("beforeJob");
}
@Override
public void afterJob(JobExecution jobExecution) {
int sum = jobExecution.getStepExecutions().stream().mapToInt(StepExecution::getWriteCount).sum();
log.info("afterJob : {}", sum);
}
}
public static class SavePersonAnnotationJobExecutionListener {
@BeforeJob
public void beforeJob(JobExecution jobExecution) {
log.info("annotation beforeJob");
}
@AfterJob
public void afterJob(JobExecution jobExecution) {
int sum = jobExecution.getStepExecutions().stream().mapToInt(StepExecution::getWriteCount).sum();
log.info("annotation afterJob : {}", sum);
}
}
}@Bean
public Job savePersonJob() throws Exception {
return this.jobBuilderFactory.get("savePersonJob")
.incrementer(new RunIdIncrementer())
.start(this.savePersonStep(null))
.listener(new SavePersonListener.SavePersonJobExecutionListener())
.listener(new SavePersonListener.SavePersonAnnotationJobExecutionListener())
.build();
}
@Bean
@JobScope
public Step savePersonStep(@Value("#{jobParameters[allow_duplicate]}") Boolean allowDuplicate) throws Exception {
return this.stepBuilderFactory.get("savePersonStep")
.<Person, Person>chunk(10)
.reader(this.itemReader())
.processor(new DuplicateValidationProcessor<>(Person::getName, allowDuplicate))
.writer(this.compositeItemWriter())
.listener(new SavePersonListener.SavePersonStepExecutionListener())
.build();
}


- 추가 요구 사항 예제
- Person.name이 empty String 인 경우 NotFoundNameException 발생
- NotFoundNameException이 3번 이상 발생하는 경우 Step 실패 처리
- SkipListener가 실행 되는 조건
- 에러 발생 횟수가 skipLimit 이하 인 경우
- skipLimit(2), throw Exception이 3번 발생하면 실행되지 않는다.
- skipLimit(3), throw Exception이 3번 발생하면 실행된다.
- skip 설정 조건에 해당하는 경우에만 실행
- SkipListner는 항상 faultTolerant() 메소드 후 선언
- 예제 코드
@Bean @JobScope public Step savePersonStep(@Value("#{jobParameters[allow_duplicate]}") Boolean allowDuplicate) throws Exception { return this.stepBuilderFactory.get("savePersonStep") .<Person, Person>chunk(10) .reader(this.itemReader()) .processor(itemProcessor(allowDuplicate)) .writer(this.compositeItemWriter()) .listener(new SavePersonListener.SavePersonStepExecutionListener()) .faultTolerant() // Skip과 같은 예외처리 메소드 지원 .skip(NotFoundNameException.class) .skipLimit(2) .build(); } private ItemProcessor<? super Person, ? extends Person> itemProcessor(Boolean allowDuplicate) throws Exception { DuplicateValidationProcessor<Person> personDuplicateValidationProcessor = new DuplicateValidationProcessor<>(Person::getName, allowDuplicate); ItemProcessor<Person, Person> validationProcessor = item -> { if (item.isNotEmptyName()) return item; throw new NotFoundNameException(); }; CompositeItemProcessor<Person, Person> compositeItemProcessor = new CompositeItemProcessorBuilder() .delegates(validationProcessor, personDuplicateValidationProcessor) .build(); compositeItemProcessor.afterPropertiesSet(); return compositeItemProcessor; }- `stepBuilderFactory`에 skip 예외 처리 등록을 도와주는 `faultTolerant()`를 등록한다. `faultTolerant()` 뒤에 skip 메서드를 등록해야 한다. - `ItemProcessor`에 Name이 Empty인경우 에러를 Throw하는 ItemProcessor을 추가한다. 여러개의 ItemProcessor 등록을 위하여 ItemWriter와 같이 `CompositeItemProcessor`를 이용한다.
retry는 재시도 하였을 경우 성공할 여지가 있는 경우에 사용되게 된다.
추가 요구사항 예제
- NotFoundNameException이 발생하면, 3번 재시도 후Person.name을“UNKNOWN”으로 변경
- Retry 설정한 ItemProcessor 생성public class PersonValidationRetryProcessor implements ItemProcessor<Person, Person> { private final RetryTemplate retryTemplate; public PersonValidationRetryProcessor() { this.retryTemplate = new RetryTemplateBuilder() .maxAttempts(3) // RetryLimit 과 유사 .retryOn(NotFoundNameException.class) .build(); } @Override public Person process(Person item) throws Exception { return this.retryTemplate.execute(context -> { // RetryCallback if (item.isNotEmptyName()) return item; throw new NotFoundNameException(); }, context -> { // RecoveryCallback return item.unknownName(); }); } }
RetryTemplate를 이용하여 retry 설정을 완료해 준다.
CompositeItemProcessor에 ItemProcessor 추가 등CompositeItemProcessor<Person, Person> compositeItemProcessor = new CompositeItemProcessorBuilder() .delegates(new PersonValidationRetryProcessor(), validationProcessor, personDuplicateValidationProcessor) .build();

RetryListener 구현 코드
public static class SavePersonRetryListener implements RetryListener{
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true; // retry 를 시작하는 설정 true일때 시작
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.info("close"); // retry 종료 후 호출
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.info("onError"); // retry 중 에러 발생 시 호출
}
}
이후 ItemProcessor의 retryTemplate에 withListener(new SavePersonRetryListener)로 등록하여 사용
Spring Batch 실습 예제
cs_diary/spring/spring-batch at main · duckbill413/cs_diary