[스프링배치] ItemReader, ItemProcessor ItemWriter 이용하기

hoyong.eom·2024년 1월 23일

스프링배치

목록 보기
3/12
post-thumbnail

ItemReader, ItemProcessor, ItemWriter

스프링 배치에서 데이터베이스에서 데이터를 읽어와 가공한 후에 다시 데이터베이스에 저장해야한다면 ItemReader, ItemProcessor, ItemWriter를 사용할 수 있다.

이 과정에 대한 예제 코드는 아래와 같다.

@Configuration
@RequiredArgsConstructor
public class TrMigrationConfig {

    @Autowired
    private OrdersRepository ordersRepository;


    @Autowired
    private AccountsRepository accountsRepository;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job trMigrationJob(Step trMigrationStep) {
        return jobBuilderFactory.get("trMigrationJob")
                .incrementer(new RunIdIncrementer())
                .start(trMigrationStep)
                .build();
    }

    @JobScope
    @Bean
    public Step trMigrationStep(ItemReader trOrdersReader, ItemProcessor trOrdersProcessor, ItemWriter trOrdersWriter) {
        return stepBuilderFactory.get("trMigrationStep")
//                5개의 데이터로 처리할거고 Orders 타입으로 읽어와서 Orders 타입으로 Write할거다.
//                chunk는 우리가 처리할 트랜잭션 단위임. 5개 단위로 커밋한다는것.
                .<Orders, Accounts>chunk((5))
                .reader(trOrdersReader)
//                .writer(new ItemWriter(){
//
//                    @Override
//                    public void write(Chunk chunk) throws Exception {
//                        List items = chunk.getItems();
//                        items.forEach(System.out::println);
//
//                    }
//                })
                .processor(trOrdersProcessor)
                .writer(trOrdersWriter)
                .build();
    }

    @Bean
    @StepScope
    public RepositoryItemWriter<Accounts> trOrdersWriter() {
        return new RepositoryItemReaderBuilder<Accounts>()
                .repository(accountsRepository)
                .methodName("save")
                .build();
    }

//    RepositoryItemWriter를 사용하지 않고도 ItemWrite로 가능함. 다만 내부 구현이 필요함.

    @Bean
    @StepScope
    public ItemWriter<Accounts> toOrderWriter2() {
        return new ItemWriter<Accounts>() {

            @Override
            public void write(Chunk<? extends Accounts> chunk) throws Exception {
                chunk.getItems().forEach(item -> accountsRepository.save(item));
            }
        };
    }


    @StepScope
    @Bean
    public ItemProcessor<Orders, Accounts> trOrderProcessor() {
        return new ItemProcessor<Orders, Accounts>() {
            @Override
            public Accounts process(Orders item) throws Exception {
                return Accounts(item);
            }
        }
    }
    
    @StepScope
    @Bean
    public RepositoryItemReader<Orders> trOrdersReader() {
        return new RepositoryItemReaderBuilder<Orders>()
                .name("trOrderReader")
                .repository(ordersRepository)
                .methodName("findAll")
                .pageSize(5)
                .arguments(Arrays.asList())
                .sorts(Collections.singletonMap("id", Sort.Direction.ASC))
                .build();

    }
}

@StepScope, @JobScope

이전 포스팅에서 공부했던것 처럼 StepScope와 JobScope는 해당 애노테이션이 붙여진 메서드가 실행될때 까지 Bean 생성을 지연시킨다.

.<Orders, Accounts>chunk((5))

trMigrationStep 함수에서 chunk 부분을 볼 수 있는데, 이 의미는 다음과 닽다.
Orders에 데이터를 읽어와서 Accounts로 저장한다. 그리고 트랜잭션의 단위는 5개씩 수행되어 커밋된다.

.reader(trOrdersReader)

reader에서는 ItemReader는 지정할 수 있다. 위 코드에서는 아래와 같이 RepositoryItemReader가 사용되었다.

    @StepScope
    @Bean
    public RepositoryItemReader<Orders> trOrdersReader() {
        return new RepositoryItemReaderBuilder<Orders>()
                .name("trOrderReader")
                .repository(ordersRepository)
                .methodName("findAll")
                .pageSize(5)
                .arguments(Arrays.asList())
                .sorts(Collections.singletonMap("id", Sort.Direction.ASC))
                .build();

    }

ItemReader로도 사용이 가능해보이지만, RepositoryItemReader가 확실히 편리해보인다.

.processor(trOrdersProcessor)

ItemProcessor로는 trOrdersProcessor가 사용되었다.

    @StepScope
    @Bean
    public ItemProcessor<Orders, Accounts> trOrderProcessor() {
        return new ItemProcessor<Orders, Accounts>() {
            @Override
            public Accounts process(Orders item) throws Exception {
                return Accounts(item);
            }
        }
    }

말그대로 Reader와 Writer 사이에 가공단계의 함수이며, 위 로직에서는 Orders를 Accounts로 마이그레이션 하는 과정이 포함되어 있다.

.writer(trOrdersWriter)

Reader와 유사하게 ItemWriter와 RepositoryItemWriter 모두 사용이 가능하다.

    @Bean
    @StepScope
    public RepositoryItemWriter<Accounts> trOrdersWriter() {
        return new RepositoryItemReaderBuilder<Accounts>()
                .repository(accountsRepository)
                .methodName("save")
                .build();
    }
    @Bean
    @StepScope
    public ItemWriter<Accounts> toOrderWriter2() {
        return new ItemWriter<Accounts>() {

            @Override
            public void write(Chunk<? extends Accounts> chunk) throws Exception {
                chunk.getItems().forEach(item -> accountsRepository.save(item));
            }
        };
    }

0개의 댓글