대부분 스프링 배치의 내용은 '기억보다는 기록을' 블로그를 참고하여 공부하였습니다.
배치에 대해 공부하고자 하시는분은 이 글 보다 아래 블로그를 보는게 훨씬 도움이 됩니다
https://jojoldu.tistory.com/
1탄 https://velog.io/@miz/Spring-Batch-%EC%82%BD%EC%A7%88%EA%B8%B0-1%ED%83%84
2탄 https://velog.io/@miz/Spring-Batch-%EC%82%BD%EC%A7%88%EA%B8%B0-1%ED%83%84
코드는 GitHub에 있습니다.
DataShareBean
을 통해 step간 데이터 공유를 쉽게 만들었습니다.jojoldu
님의 글을 참조하여 파티셔닝으로 처리해보겠습니다.파티셔닝
- 매니저(마스터)를 이용해 데이터를 쪼개서 나눈 다음 파티션에서 슬레이브가 독립적으로 동작한다.
@Slf4j
@Configuration
@RequiredArgsConstructor
public class PayTotalJobPartitionConfiguration {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
public static final String JOB_NAME = "PayTotalJobPartitionJob";
public static final String BEAN_PREFIX = JOB_NAME + "_";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private final PayRepository payRepository;
private final TotalPayRepository totalPayRepository;
private final DataShareBean<Long> dataShareBean;
private final static int chunkSize = 1000;
private final static int poolSize = 20;
@Bean(name = JOB_NAME+"_taskPool")
public TaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(poolSize);
executor.setMaxPoolSize(poolSize);
executor.setThreadNamePrefix("partition-thread");
executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
executor.initialize();
return executor;
}
@Bean(name = JOB_NAME+"_partitionHandler")
public TaskExecutorPartitionHandler partitionHandler() {
TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); // (1)
partitionHandler.setStep(step1()); // (2)
partitionHandler.setTaskExecutor(executor()); // (3)
partitionHandler.setGridSize(poolSize); // (4)
return partitionHandler;
}
@Bean(JOB_NAME)
public Job job() {
//listener beforeJob으로 변경
//dataShareBean.putData("totalAmount", 0L);
return jobBuilderFactory.get(JOB_NAME)
.listener(listener())
.start(step1Manager())
.next(step2(null))
//.preventRestart()
.build();
}
@Bean(BEAN_PREFIX + "listener")
public JobExecutionListener listener() {
return new TotalJobListener(dataShareBean);
}
@Bean(name = JOB_NAME +"_step1Manager")
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager") // (1)
.partitioner("step1", partitioner(null)) // (2)
.step(step1()) // (3)
.partitionHandler(partitionHandler()) // (4)
.build();
}
@Bean(name = JOB_NAME +"_partitioner")
@StepScope
public PayIdRangePartitioner partitioner(
@Value("#{jobParameters[requestDate]}") String requestDate) {
return new PayIdRangePartitioner(payRepository,requestDate);
}
@Bean(BEAN_PREFIX + "step")
public Step step1() {
return stepBuilderFactory.get(BEAN_PREFIX + "step")
.<Pay, Pay>chunk(chunkSize)
.reader(reader(null,null))
.writer(writer())
.build();
}
@Bean(BEAN_PREFIX + "reader")
@StepScope
public JpaPagingItemReader<Pay> reader(
@Value("#{stepExecutionContext[minId]}") Long minId,
@Value("#{stepExecutionContext[maxId]}") Long maxId
) {
Map<String, Object> params = new HashMap<>();
params.put("minId", minId);
params.put("maxId", maxId);
return new JpaPagingItemReaderBuilder<Pay>()
.name(BEAN_PREFIX + "reader")
.entityManagerFactory(entityManagerFactory)
.pageSize(chunkSize)
.queryString("select p from Pay p where p.id BETWEEN :minId AND :maxId")
.parameterValues(params)
.build();
}
@Bean(BEAN_PREFIX + "writer")
public ItemWriter<Pay> writer() {
return list -> {
for(Pay pay : list) {
dataShareBean.addData("totalAmount",pay.getAmount());
}
};
}
@Bean(BEAN_PREFIX + "step2")
@JobScope
public Step step2(@Value("#{jobParameters[requestDate]}") String requestDate) {
return stepBuilderFactory.get(BEAN_PREFIX + "step2")
.tasklet((contribution, chunkContext) -> {
TotalPay(dataShareBean.getTotal(),requestDate);
TotalPay totalPay = new TotalPay(dataShareBean.getData("totalAmount"),requestDate);
totalPayRepository.save(totalPay);
return RepeatStatus.FINISHED;
}).build();
}
}
@Bean(name = JOB_NAME+"_partitionHandler")
public TaskExecutorPartitionHandler partitionHandler() {
TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); // (1)
partitionHandler.setStep(step1()); // (2)
partitionHandler.setTaskExecutor(executor()); // (3)
partitionHandler.setGridSize(poolSize); // (4)
return partitionHandler;
}
poolSize
로 동일하게 사용합니다. @Bean(name = JOB_NAME+"_taskPool")
public TaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(poolSize);
executor.setMaxPoolSize(poolSize);
executor.setThreadNamePrefix("partition-thread");
executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
executor.initialize();
return executor;
}
@Bean(name = JOB_NAME +"_step1Manager")
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager") // (1)
.partitioner("step1", partitioner(null)) // (2)
.step(step1()) // (3)
.partitionHandler(partitionHandler()) // (4)
.build();
}
@Bean(name = JOB_NAME +"_partitioner")
@StepScope
public PayIdRangePartitioner partitioner(
@Value("#{jobParameters[requestDate]}") String requestDate) {
return new PayIdRangePartitioner(payRepository,requestDate); //(1)
}
@Slf4j
@RequiredArgsConstructor
public class PayIdRangePartitioner implements Partitioner {
private final PayRepository payRepository;
private final String requestDate;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Long max = payRepository.findMaxId(requestDate);
Long min = payRepository.findMinId(requestDate);
Long targetSize = (max - min) / gridSize + 1;
log.info("max,min = {} {}", max,min);
Map<String, ExecutionContext> result = new HashMap<>();
long number = 0;
long start = min;
long end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
value.putLong("minId", start);
value.putLong("maxId", end);
log.info("partion min max = {} {}",start,end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
}
Map<String, ExecutionContext>
을 반환합니다.Step Executions
이 됩니다. @Bean(BEAN_PREFIX + "reader")
@StepScope
public JpaPagingItemReader<Pay> reader(
@Value("#{stepExecutionContext[minId]}") Long minId,
@Value("#{stepExecutionContext[maxId]}") Long maxId
) {
Map<String, Object> params = new HashMap<>();
params.put("minId", minId);
params.put("maxId", maxId);
return new JpaPagingItemReaderBuilder<Pay>()
.name(BEAN_PREFIX + "reader")
.entityManagerFactory(entityManagerFactory)
.pageSize(chunkSize)
.queryString("select p from Pay p where p.id BETWEEN :minId AND :maxId")
.parameterValues(params)
.build();
}
Step Executions
에 있는 max id와 min id를 받아옵니다.@RequiredArgsConstructor
@Slf4j
public class TotalJobListener extends JobExecutionListenerSupport {
private final DataShareBean dataShareBean;
@Override
public void beforeJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.STARTED) {
log.info("start Job & initial dataShareBean.totalAmount");
dataShareBean.putData("totalAmount",0L);
}
}
}
dataShareBean
은 등록할때 DI 해줍니다.spring:
datasource:
hikari:
maximumPoolSize : 20