이전글 - [Spring Batch 7편] MyBatisPagingItemReader, MyBatisItemWriter로 DB 읽고 쓰기
CompositeItemProcessor
는 Spring Batch에서 제공하는 클래스이며, 여러 개의 ItemProcessor를 하나로 결합하여 순차적으로 데이터를 처리할 수 있게 한다.
=> 이전 ItemProcessor
의 출력값은 다음 ItemProcessor
의 입력값으로 전달되며, 여러 처리 단계를 하나의 프로세서로 묶어 활용할 수 있다.
장점
- 단계별 처리
데이터를 처리하는 과정을 여러 단계로 나누어 코드를 명확하게 관리할 수 있다.- 재사용 가능성
각 ItemProcessor를 다른 Job에서도 재사용할 수 있다.- 유연성
다양한 ItemProcessor를 조합하여 유연한 데이터 처리 로직을 구현할 수 있다.
단점
- 설정 복잡성
여러 ItemProcessor를 설정하고 관리해야 하므로 복잡해질 수 있다.- 성능 저하
처리 단계가 많아질수록 성능이 떨어질 가능성이 있다.
LowerCaseItemProcessor
에 ItemProcessor를 구현하여 이름과 성별을 소문자로 변경하는 프로세서를 작성한다.
public class LowerCaseItemProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer item) throws Exception {
item.setName(item.getName().toLowerCase()); // 이름 소문자 변환
item.setGender(item.getGender().toLowerCase()); // 성별 소문자 변환
return item;
}
}
After20YearsItemProcessor
클래스에 ItemProcessor를 구현하여 나이에 20년을 더하는 프로세서를 작성한다.
public class After20YearsItemProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer item) throws Exception {
item.setAge(item.getAge() + 20); // 나이에 20 추가
return item;
}
}
CompositeItemProcessor
에 위 두 개의 ItemProcessor를 결합하여 순차적으로 실행되도록 설정했다.
@Bean
public CompositeItemProcessor<Customer, Customer> compositeItemProcessor() {
return new CompositeItemProcessorBuilder<Customer, Customer>()
.delegates(List.of(
new LowerCaseItemProcessor(),
new After20YearsItemProcessor()
))
.build();
}
MyBatisItemReader에서 사용할 SQL 쿼리를 customer.xml에 정의 해주었다.
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.springbatch.jobs.MyBatisItemReader">
<resultMap id="customerResult" type="com.example.springbatch.jobs.models.Customer">
<result property="id" column="id"/>
<result property="name" column="name"/>
<result property="age" column="age"/>
<result property="gender" column="gender"/>
</resultMap>
<!-- Customer 테이블에서 데이터를 조회하는 SELECT 쿼리 -->
<select id="selectCustomers" resultMap="customerResult">
SELECT id, name, age, gender
FROM customer
ORDER BY id
</select>
<!-- customer 테이블에 저장하는 INSERT 쿼리 -->
<insert id="insertCustomers" parameterType="com.example.springbatch.jobs.models.Customer">
INSERT INTO customer(name, age, gender)
VALUES (#{name}, #{age}, #{gender});
</insert>
</mapper>
데이터를 customer 테이블에서 읽어오는MyBatisPagingItemReader
를 작성해주었다
@Bean
public CompositeItemProcessor<Customer, Customer> compositeItemProcessor() {
return new CompositeItemProcessorBuilder<Customer, Customer>()
.delegates(List.of(
new LowerCaseItemProcessor(),
new After20YearsItemProcessor()
))
.build();
}
package com.example.springbatch.jobs.MyBatisItemReader;
import com.example.springbatch.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.builder.CompositeItemProcessorBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.List;
@Slf4j
@Configuration
public class MyBatisPagingReaderJobConfig {
public static final int CHUNK_SIZE = 2;
public static final String ENCODING = "UTF-8";
public static final String MYBATIS_PAGING_JOB = "MYBATIS_PAGING_JOB";
@Autowired
DataSource dataSource;
@Autowired
SqlSessionFactory sqlSessionFactory;
@Bean
public MyBatisPagingItemReader<Customer> myBatisPagingItemReader() throws Exception {
return new MyBatisPagingItemReaderBuilder<Customer>()
.sqlSessionFactory(sqlSessionFactory)
.pageSize(CHUNK_SIZE)
.queryId("com.example.springbatch.jobs.MyBatisItemReader.selectCustomers")
.build();
}
@Bean
public CompositeItemProcessor<Customer, Customer> compositeItemProcessor() {
return new CompositeItemProcessorBuilder<Customer, Customer>()
.delegates(List.of(
new LowerCaseItemProcessor(),
new After20YearsItemProcessor()
))
.build();
}
@Bean
public FlatFileItemWriter<Customer> customerFlatFileItemWriter2() {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerFlatFileItemWriter2")
.resource(new FileSystemResource("./output/customer_output_v4.csv"))
.encoding(ENCODING)
.delimited()
.delimiter("\t")
.names("name", "age", "gender")
.build();
}
@Bean
public Step customerMyBatisPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
log.info("Initializing customerMyBatisPagingStep...");
return new StepBuilder("customerMyBatisPagingStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(myBatisPagingItemReader())
.processor(compositeItemProcessor())
.writer(customerFlatFileItemWriter2())
.build();
}
@Bean
public Job customerMyBatisPagingJob(Step customerMyBatisPagingStep, JobRepository jobRepository) {
log.info("Initializing customerMyBatisPagingJob...");
return new JobBuilder(MYBATIS_PAGING_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(customerMyBatisPagingStep)
.build();
}
}
실행할 job을 지정해주었다.
job:
name: MYBATIS_PAGING_JOB
enabled: true
참고글 - [SpringBatch 연재 08] CompositeItemProcessor 으로 여러단계에 걸쳐 데이터 Transform하기
깃허브 - https://github.com/hysong4u/springbatch
항상 열심히 꾸준히 시간 잘 지키며 해줘서 감사해요 ㅠㅠ 끝까지 달려보아요 ㅠㅠ