Spring batch를 이용하여 주문내역을 관리해보자
🍖실습 프로젝트 경로
Level Enum
생성
@RequiredArgsConstructor public class SaveUsersTasklet implements Tasklet { private final int SIZE = 10_000; private final UsersRepository usersRepository; @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { List<Users> users = createUsers(); Collections.shuffle(users); usersRepository.saveAll(users); return RepeatStatus.FINISHED; } private List<Users> createUsers(){ List<Users> users = new ArrayList<>(); for (int i = 0; i < SIZE; i++) { users.add(Users.builder() .orders(Collections.singletonList(Orders.builder() .price(1_000) .createdDate(LocalDate.of(2023, 1, 1)) .itemName("item " + i) .build())) .name("test username " + i) .build()); } for (int i = 0; i < SIZE; i++) { users.add(Users.builder() .orders(Collections.singletonList(Orders.builder() .price(200_000) .createdDate(LocalDate.of(2023, 1, 2)) .itemName("item " + i) .build())) .name("test username " + i) .build()); } for (int i = 0; i < SIZE; i++) { users.add(Users.builder() .orders(Collections.singletonList(Orders.builder() .price(300_000) .createdDate(LocalDate.of(2023, 1, 3)) .itemName("item " + i) .build())) .name("test username " + i) .build()); } for (int i = 0; i < SIZE; i++) { users.add(Users.builder() .orders(Collections.singletonList(Orders.builder() .price(500_000) .createdDate(LocalDate.of(2023, 1, 4)) .itemName("item " + i) .build())) .name("test username " + i) .build()); } return users; } }
SaveUsersTasklet
를 통해 40,000개의 User data 생성
@Bean(JOB_NAME + "_saveUserStep") public Step saveUserStep() { return this.stepBuilderFactory.get(JOB_NAME + "_saveUserStep") .tasklet(new SaveUsersTasklet(usersRepository)) .build(); }
saveUserStep
생성
@Bean(JOB_NAME + "_userLevelUpStep") public Step userLevelUpStep() throws Exception { return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep") .<Users, Users>chunk(CHUNK_SIZE) .reader(this.loadUsersData()) .processor(this.checkUsersData()) .writer(this.fixUsersGradeData()) .build(); } // ItemReader private ItemReader<? extends Users> loadUsersData() throws Exception { JpaPagingItemReader<Users> jpaPagingItemReader = new JpaPagingItemReaderBuilder<Users>() .name(JOB_NAME + "_loadUsersData") .entityManagerFactory(entityManagerFactory) .pageSize(CHUNK_SIZE) .queryString("select u from Users u") .build(); jpaPagingItemReader.afterPropertiesSet(); return jpaPagingItemReader; } // ItemProcessor private ItemProcessor<? super Users, ? extends Users> checkUsersData() { return user -> { if (user.availableLevelUp()) return user; return null; }; } // ItemWriter private ItemWriter<? super Users> fixUsersGradeData() throws Exception { return users -> users.forEach(user -> { user.levelUp(); usersRepository.save(user); }); }
@Slf4j @RequiredArgsConstructor public class UsersItemListener { private final UsersRepository usersRepository; @AfterJob public void showJobResult(JobExecution jobExecution){ Collection<Object> users = usersRepository.findAllByUpdatedDate(LocalDate.now()); Date startTime = jobExecution.getStartTime(); Date endTime = jobExecution.getEndTime(); long totalTime = endTime.getTime() - startTime.getTime(); log.info("총 데이터 처리 {}건, 처리 시간 : {}millis", users.size(), totalTime); } }
// Users Class의 Orders 연관관계 설정 @OneToMany(cascade = CascadeType.PERSIST, fetch = FetchType.EAGER) @JoinColumn(name = "users_id") private List<Orders> orders = new ArrayList<>(); // Users의 주문 총 금액 합산 Method private int getTotalAmount(){ return this.orders.stream().mapToInt(Orders::getPrice).sum(); }
date=2023-02
JobParameters 사용2023년_2월_주문_금액.csv
파일은 2023년 2월 1일 ~ 말일 주문 통계 내역@Getter @Builder public class OrderStatistics { private String amount; private LocalDate date; }
date
파라미터가 없는 경우, orderStatisticsStep은 실행하지 않는다.// orderStatisticsStep 생성
@Bean(JOB_NAME + "_orderStatisticsStep")
@JobScope
public Step orderStatisticsStep(@Value("#{jobParameters[date]}") String date,
@Value("#{jobParameters[path]}") String path) throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_orderStatisticsStep")
.<OrderStatistics, OrderStatistics>chunk(CHUNK_SIZE)
.reader(orderStatisticsReader(date))
.writer(orderStatisticsWriter(date, path))
.build();
}
// orderStatisticsStep ItemReader
private ItemReader<? extends OrderStatistics> orderStatisticsReader(String date) throws Exception {
YearMonth yearMonth = YearMonth.parse(date);
Map<String, Object> parameters = new HashMap<>();
parameters.put("startDate", yearMonth.atDay(1));
parameters.put("endDate", yearMonth.atEndOfMonth());
Map<String, Order> sortKey = new HashMap<>();
sortKey.put("created_date", Order.ASCENDING);
JdbcPagingItemReader<OrderStatistics> itemReader = new JdbcPagingItemReaderBuilder<OrderStatistics>()
.name(JOB_NAME + "_orderStatisticsReader")
.dataSource(dataSource)
.rowMapper((rs, rowNum) -> OrderStatistics.builder()
.amount(rs.getString(1))
.date(LocalDate.parse(rs.getString(2), DateTimeFormatter.ISO_DATE))
.build())
.pageSize(CHUNK_SIZE)
.selectClause("sum(price) as amount, created_date")
.fromClause("orders")
.whereClause("created_date >= :startDate and created_date <= :endDate")
.groupClause("created_date")
.parameterValues(parameters)
.sortKeys(sortKey)
.build();
itemReader.afterPropertiesSet();
return itemReader;
}
// orderStatisticsStep ItemWriter
private ItemWriter<? super OrderStatistics> orderStatisticsWriter(String date, String path) throws Exception {
YearMonth yearMonth = YearMonth.parse(date);
String fileName = yearMonth.getYear() + "년_" + yearMonth.getMonthValue() + "월_일별_주문_금액.csv";
BeanWrapperFieldExtractor<OrderStatistics> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[]{"amount", "date"});
DelimitedLineAggregator<OrderStatistics> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(", ");
lineAggregator.setFieldExtractor(fieldExtractor);
FlatFileItemWriter<OrderStatistics> fileItemWriter = new FlatFileItemWriterBuilder<OrderStatistics>()
.name(JOB_NAME + "_orderStatisticsWriter")
.encoding("UTF-8")
.resource(new FileSystemResource(path + fileName))
.lineAggregator(lineAggregator)
.headerCallback(writer -> writer.write("총액, 날짜"))
.append(false)
.build();
fileItemWriter.afterPropertiesSet();
return fileItemWriter;
}
JobParameter에 date
변수가 들어왔는지 확인하기 위한 JobExecutionDecider
class 생성
public class JobParametersDecide implements JobExecutionDecider {
public static final FlowExecutionStatus CONTINUE = new FlowExecutionStatus("CONTINUE");
public final String key;
public JobParametersDecide(String key) {
this.key = key;
}
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String value = jobExecution.getJobParameters().getString(key);
if (StringUtils.isEmpty(value))
return FlowExecutionStatus.COMPLETED;
return CONTINUE;
}
}
@Bean(JOB_NAME)
public Job usersGradeJob() throws Exception {
return this.jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(this.saveUserStep())
.next(this.userLevelUpStep())
.listener(new UsersItemListener(usersRepository))
.next(new JobParametersDecide("date"))
.on(JobParametersDecide.CONTINUE.getName())
.to(this.orderStatisticsStep(null))
.build()
.build();
}
start
: saveUserStep()을 이용하여 Users 더미데이터 저장next
: userLevelUpStep()을 이용하여 Users의 Level 업데이트listener
등록next
: date JobParameter
이 들어왔는지 확인on
: 변수가 입력되어 CONTINUE
가 리턴되면 계속하여 진행to
: CONTINUE
된 경우 주문내역 생성SaveUserTasklet
에서 User 40,000건 저장, Chunk Size는 1,000userLevelUpStep
1회 | 2회 | 3회 | |
---|---|---|---|
Simple Step | 11519 | 11157 | 12822 |
Async Step | 11015 | 13581 | 11091 |
Multi-Thread Step | 10303 | 8769 | 10127 |
Partition Step | 8359 | 9034 | 7898 |
Async+Partition Step | 9518 | 10127 | 8094 |
Parallel Step | 10550 | 10933 | 10346 |
Partition+Parallel Step | 8303 | 10156 | 8792 |
Async+Partiton, Partition+Parallel은 Git에서 확인 가능
앞으로 성능 개선에 사용되는 TaskExecutor bean 수정을 위하여 main
을 아래와 같이 수정
@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchApplication {
public static void main(String[] args) {
System.exit(SpringApplication.exit(
SpringApplication.run(SpringBatchApplication.class, args)
));
}
@Bean
@Primary
TaskExecutor taskExecutor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setThreadNamePrefix("batch-thread-");
taskExecutor.initialize();
return taskExecutor;
}
}
java.util.concurrent
에서 제공되는 Future
기반 asynchronousimplementation 'org.springframework.batch:spring-batch-integration' // Async
userLevelUpStep
의 ItemProcessor
와 ItemWriter
을 AsyncItemProcessor
와 AsyncItemWriter
로 수정 private AsyncItemProcessor<Users, Users> itemProcessor() {
ItemProcessor<Users, Users> itemProcessor = user -> {
if (user.availableLevelUp())
return user;
return null;
};
AsyncItemProcessor<Users, Users> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor);
asyncItemProcessor.setTaskExecutor(this.taskExecutor);
return asyncItemProcessor;
}
private AsyncItemWriter<Users> itemWriter() throws Exception {
ItemWriter<Users> itemWriter = users -> users.forEach(user -> {
user.levelUp();
usersRepository.save(user);
});
AsyncItemWriter<Users> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
userLevelUpStep()
의 <Users, Users>chunk
를 <Users, Future<Users>>chunk
로 수정해준다.Multi-Thread는 비교적 간단하게 적용 가능하다.
@Bean(JOB_NAME + "_userLevelUpStep")
public Step userLevelUpStep() throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep")
.<Users, Users>chunk(CHUNK_SIZE)
.reader(this.loadUsersData())
.processor(this.checkUsersData())
.writer(this.fixUsersGradeData())
.taskExecutor(this.taskExecutor) // multi thread 를 위해 추가
.throttleLimit(8) // 8개의 thread로 chunk를 동시 처리 (default는 4개)
.build();
}
taskExecutor
와 throtteleLimit
만 설정해주면 사용할 수 있다.throttleLimit
은 동시에 사용할 thread의 개수이다. (default: 4개)Master
기준으로 여러 Slave
Step을 생성해 Step 기준으로 Multi-Thread 처리Partitioner
Class 생성Partition Step을 적용하기 위해서는 각 Slave Step당 몇 건 씩을 처리해야하는지 지정해주는 클래스가 필요하다.
유저의 min id와 max id를 불러와 동일한 개수의 item을 할당해주는 클래스 생성
public class UserLevelUpPartitioner implements Partitioner {
private final UsersRepository usersRepository;
public UserLevelUpPartitioner(UsersRepository usersRepository) {
this.usersRepository = usersRepository;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int minId = usersRepository.findMinId(); // 1
int maxId = usersRepository.findMaxId(); // 40_000
int targetSize = (maxId - minId) / gridSize + 1; // 5000
/**
* ExecutionContext 생성
* partition0 : 1, 5_000
* partition1 : 5_001, 10_000
* ~~~
*/
Map<String, ExecutionContext> result = new HashMap<>();
int number = 0;
int start = minId;
int end = start + targetSize -1;
while (start <= maxId){
ExecutionContext value = new ExecutionContext();
result.put("partition"+number, value);
if (end > maxId)
end = maxId;
value.putLong("minId", start);
value.putLong("maxId", end);
start += targetSize;
end += targetSize;
number += 1;
}
return result;
}
}
@Bean(JOB_NAME + "_userLevelUpStep.manager")
public Step userLevelUpManagerStep() throws Exception {
return this.stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep.manager")
.partitioner(JOB_NAME + "_userLevelUpStep", new UserLevelUpPartitioner(usersRepository))
.step(this.userLevelUpStep())
.partitionHandler(this.taskExecutorPartitionHandler())
.build();
}
@Bean(JOB_NAME + "_taskExecutorPartitionHandler")
public PartitionHandler taskExecutorPartitionHandler() throws Exception {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setStep(this.userLevelUpStep());
handler.setTaskExecutor(this.taskExecutor);
handler.setGridSize(8);
return handler;
}
JpaPagingItemReader
와 같은 보다 직접적인 클래스 사용 필요stepExecutionContext
를 통하여 minId, maxId를 불러오기 때문에 @StepScope
의 등록이 필요하다.@Bean(JOB_NAME + "_jpaItemReader")
@StepScope
public JpaPagingItemReader<? extends Users> itemReader(@Value("#{stepExecutionContext[minId]}") Integer minId,
@Value("#{stepExecutionContext[maxId]}") Integer maxId) throws Exception {
Map<String, Object> parameters = new HashMap<>();
parameters.put("minId", minId);
parameters.put("maxId", maxId);
JpaPagingItemReader<Users> jpaPagingItemReader = new JpaPagingItemReaderBuilder<Users>()
.name(JOB_NAME + "_loadUsersData")
.entityManagerFactory(entityManagerFactory)
.pageSize(CHUNK_SIZE)
.queryString("select u from Users u where u.id between :minId and :maxId")
.parameterValues(parameters)
.build();
jpaPagingItemReader.afterPropertiesSet();
return jpaPagingItemReader;
}
saveUserStep
을 실행하는 saveUserFlow
생성 및 userLevelUpStep
및 orderStatisticsStep
을 동시에 실행하는 splitFlow
생성@Bean(JOB_NAME + "_saveUserFlow")
public Flow saveUserFlow() {
TaskletStep saveUserStep = this.stepBuilderFactory.get(JOB_NAME + "_saveUserStep")
.tasklet(new SaveUsersTasklet(usersRepository))
.build();
return new FlowBuilder<SimpleFlow>(JOB_NAME + "_saveUserFlow")
.start(saveUserStep)
.build();
}
@Bean(JOB_NAME+"_splitFlow")
@JobScope
public Flow splitFlow(@Value("#{jobParameters[date]}") String date) throws Exception {
Flow userLevelUpFlow = new FlowBuilder<SimpleFlow>(JOB_NAME + "_userLevelUpFlow")
.start(userLevelUpStep())
.build();
return new FlowBuilder<SimpleFlow>(JOB_NAME + "_splitFlow")
.split(this.taskExecutor)
.add(userLevelUpFlow, orderStatisticsFlow(date))
.build();
}
private Flow orderStatisticsFlow(String date) throws Exception {
return new FlowBuilder<SimpleFlow>(JOB_NAME+"_orderStatisticsFlow")
.start(new JobParametersDecide("date"))
.on(JobParametersDecide.CONTINUE.getName())
.to(this.orderStatisticsStep(date))
.build();
}
@Bean(JOB_NAME)
public Job usersGradeJob() throws Exception {
return this.jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.listener(new UsersItemListener(usersRepository))
.start(this.saveUserFlow())
.next(this.splitFlow(null))
.build()
.build();
}