
스프링 배치에서 데이터베이스에서 데이터를 읽어와 가공한 후에 다시 데이터베이스에 저장해야한다면 ItemReader, ItemProcessor, ItemWriter를 사용할 수 있다.
이 과정에 대한 예제 코드는 아래와 같다.
@Configuration
@RequiredArgsConstructor
public class TrMigrationConfig {
@Autowired
private OrdersRepository ordersRepository;
@Autowired
private AccountsRepository accountsRepository;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job trMigrationJob(Step trMigrationStep) {
return jobBuilderFactory.get("trMigrationJob")
.incrementer(new RunIdIncrementer())
.start(trMigrationStep)
.build();
}
@JobScope
@Bean
public Step trMigrationStep(ItemReader trOrdersReader, ItemProcessor trOrdersProcessor, ItemWriter trOrdersWriter) {
return stepBuilderFactory.get("trMigrationStep")
// 5개의 데이터로 처리할거고 Orders 타입으로 읽어와서 Orders 타입으로 Write할거다.
// chunk는 우리가 처리할 트랜잭션 단위임. 5개 단위로 커밋한다는것.
.<Orders, Accounts>chunk((5))
.reader(trOrdersReader)
// .writer(new ItemWriter(){
//
// @Override
// public void write(Chunk chunk) throws Exception {
// List items = chunk.getItems();
// items.forEach(System.out::println);
//
// }
// })
.processor(trOrdersProcessor)
.writer(trOrdersWriter)
.build();
}
@Bean
@StepScope
public RepositoryItemWriter<Accounts> trOrdersWriter() {
return new RepositoryItemReaderBuilder<Accounts>()
.repository(accountsRepository)
.methodName("save")
.build();
}
// RepositoryItemWriter를 사용하지 않고도 ItemWrite로 가능함. 다만 내부 구현이 필요함.
@Bean
@StepScope
public ItemWriter<Accounts> toOrderWriter2() {
return new ItemWriter<Accounts>() {
@Override
public void write(Chunk<? extends Accounts> chunk) throws Exception {
chunk.getItems().forEach(item -> accountsRepository.save(item));
}
};
}
@StepScope
@Bean
public ItemProcessor<Orders, Accounts> trOrderProcessor() {
return new ItemProcessor<Orders, Accounts>() {
@Override
public Accounts process(Orders item) throws Exception {
return Accounts(item);
}
}
}
@StepScope
@Bean
public RepositoryItemReader<Orders> trOrdersReader() {
return new RepositoryItemReaderBuilder<Orders>()
.name("trOrderReader")
.repository(ordersRepository)
.methodName("findAll")
.pageSize(5)
.arguments(Arrays.asList())
.sorts(Collections.singletonMap("id", Sort.Direction.ASC))
.build();
}
}
이전 포스팅에서 공부했던것 처럼 StepScope와 JobScope는 해당 애노테이션이 붙여진 메서드가 실행될때 까지 Bean 생성을 지연시킨다.
trMigrationStep 함수에서 chunk 부분을 볼 수 있는데, 이 의미는 다음과 닽다.
Orders에 데이터를 읽어와서 Accounts로 저장한다. 그리고 트랜잭션의 단위는 5개씩 수행되어 커밋된다.
reader에서는 ItemReader는 지정할 수 있다. 위 코드에서는 아래와 같이 RepositoryItemReader가 사용되었다.
@StepScope
@Bean
public RepositoryItemReader<Orders> trOrdersReader() {
return new RepositoryItemReaderBuilder<Orders>()
.name("trOrderReader")
.repository(ordersRepository)
.methodName("findAll")
.pageSize(5)
.arguments(Arrays.asList())
.sorts(Collections.singletonMap("id", Sort.Direction.ASC))
.build();
}
ItemReader로도 사용이 가능해보이지만, RepositoryItemReader가 확실히 편리해보인다.
ItemProcessor로는 trOrdersProcessor가 사용되었다.
@StepScope
@Bean
public ItemProcessor<Orders, Accounts> trOrderProcessor() {
return new ItemProcessor<Orders, Accounts>() {
@Override
public Accounts process(Orders item) throws Exception {
return Accounts(item);
}
}
}
말그대로 Reader와 Writer 사이에 가공단계의 함수이며, 위 로직에서는 Orders를 Accounts로 마이그레이션 하는 과정이 포함되어 있다.
Reader와 유사하게 ItemWriter와 RepositoryItemWriter 모두 사용이 가능하다.
@Bean
@StepScope
public RepositoryItemWriter<Accounts> trOrdersWriter() {
return new RepositoryItemReaderBuilder<Accounts>()
.repository(accountsRepository)
.methodName("save")
.build();
}
@Bean
@StepScope
public ItemWriter<Accounts> toOrderWriter2() {
return new ItemWriter<Accounts>() {
@Override
public void write(Chunk<? extends Accounts> chunk) throws Exception {
chunk.getItems().forEach(item -> accountsRepository.save(item));
}
};
}