이전글 - [Spring Batch 6편] JpaPagingItemReader, JpaItemWriter로 DB 읽고 쓰기
오늘은 MyBatisPagingItemReader로 파일을 일고 MyBatisItemWriter로 파일 쓰는 실습을 진행해보자
MyBatis는 Java 기반의 ORM 프레임워크 중 하나로 Hibernate와 달리 SQL 중심의 접근 방식이 특징이다.
MyBatisPagingItemReader는 Mybatis에서 제공하는 ItemReader 인터페이스를 구현하는 클래스이다.
장점
단점
SqlSessionFactory
: SqlSessionFactory를 통해 MyBatis와 연동QueryId
: setQueryId()를 통해 데이터를 읽을 SQL 쿼리 ID를 지정PageSize
: 페이징 단위를 지정 Spring Batch와 MyBatis를 통합하여 데이터베이스에서 데이터를 읽고 처리한 후 파일로 출력하는 과정을 구현해보자
MyBatis
를 활용하기 위해 프로젝트에 아래와 같은 의존성을 추가한다.
//mybatis
implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:3.0.2'
데이터베이스 테이블의 구조와 매핑되는Customer
객체를 작성한다.
@Data
public class Customer {
private Long id;
private String name;
private int age;
private String gender;
}
데이터를 읽기 위한 MyBatisPagingItemReader
를 설정한다. 페이징 처리를 통해 일정 크기(Chunk)만큼 데이터를 읽어온다.
@Bean
public MyBatisPagingItemReader<Customer> myBatisPagingItemReader() throws Exception {
return new MyBatisPagingItemReaderBuilder<Customer>()
.sqlSessionFactory(sqlSessionFactory) // MyBatis SqlSessionFactory 등록
.pageSize(CHUNK_SIZE) // 한 번에 읽어올 데이터 개수
.queryId("com.example.springbatch.jobs.MyBatisItemReader.selectCustomers") // Mapper 쿼리 ID 지정
.build();
}
❗queryId는 Mapper XML 파일의 쿼리 ID와 정확히 일치해야 한다.❗
ex) selectCustomers는 namespace와 함께com.example.springbatch.jobs.MyBatisItemReader.selectCustomers
로 설정된다.
MyBatisPagingItemReader
가 호출할 쿼리를 MyBatis Mapper XML 파일에 정의한다.
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.springbatch.jobs.MyBatisItemReader">
<resultMap id="customerResult" type="com.example.springbatch.jobs.models.Customer">
<result property="id" column="id"/>
<result property="name" column="name"/>
<result property="age" column="age"/>
<result property="gender" column="gender"/>
</resultMap>
<select id="selectCustomers" resultMap="customerResult">
SELECT id, name, age, gender
FROM customer
LIMIT #{_skiprows}, #{_pagesize}
</select>
</mapper>
namespace
: 쿼리를 식별하는 고유 식별자로 queryId
와 연결된다.
resultMap
: 쿼리 결과를 Customer 객체와 매핑한다.
쿼리
: LIMIT #{_skiprows}, #{_pagesize}를 활용하여 MyBatis에서 페이징 처리한다.
application.yml 설정 추가
mybatis:
mapper-locations: classpath:mappers/*.xml
application.yml 설정파일에는 mapper 클래스에 대한 경로를 추가해주었다.
ItemProcessor는 읽어온 데이터를 가공하는 역할을 한다. 여기서는 읽은 Customer 데이터를 그대로 반환하는 역할은 한다.
@Slf4j
public class CustomerItemProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer item) throws Exception {
log.info("Processing item: {}", item); // 읽은 데이터 확인
return item; // 그대로 반환
}
}
package com.example.springbatch.jobs.MyBatisItemReader;
import com.example.springbatch.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Slf4j
@Configuration
public class MyBatisReaderJobConfig {
public static final int CHUNK_SIZE = 2;
public static final String ENCODING = "UTF-8";
public static final String MYBATIS_PAGING_JOB = "MYBATIS_PAGING_JOB";
@Autowired
DataSource dataSource;
@Autowired
SqlSessionFactory sqlSessionFactory;
@Bean
public MyBatisPagingItemReader<Customer> myBatisPagingItemReader() throws Exception {
return new MyBatisPagingItemReaderBuilder<Customer>()
.sqlSessionFactory(sqlSessionFactory)
.pageSize(CHUNK_SIZE)
.queryId("com.example.springbatch.jobs.MyBatisItemReader.selectCustomers")
.build();
}
@Bean
public FlatFileItemWriter<Customer> customerFlatFileItemWriter2() {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerFlatFileItemWriter2")
.resource(new FileSystemResource("./output/customer_output.csv"))
.encoding(ENCODING)
.delimited()
.delimiter("\t")
.names("name", "age", "gender")
.build();
}
@Bean
public Step customerMyBatisPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
log.info("Initializing customerMyBatisPagingStep...");
return new StepBuilder("customerMyBatisPagingStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(myBatisPagingItemReader())
.processor(new CustomerItemProcessor())
.writer(customerFlatFileItemWriter2())
.build();
}
@Bean
public Job customerMyBatisPagingJob(Step customerMyBatisPagingStep, JobRepository jobRepository) {
log.info("Initializing customerMyBatisPagingJob...");
return new JobBuilder(MYBATIS_PAGING_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(customerMyBatisPagingStep)
.build();
}
}
application.yml에 실행할 JOB 이름을 지정해주었다.
spring:
batch:
jdbc:
initialize-schema: always
job:
name: MYBATIS_PAGING_JOB
MyBatisPagingItemReader
을 활용하여 customer 테이블에서 쿼리(selectCustomers)를 실행하였고, 여기서 읽어들인 데이터가 우리가 설정한 출력 경로 customer_output.csv
에 출력된 것을 확인할 수 있다.
MyBatisBatchItemWriter
는 Spring Batch에서 제공하는 ItemWriter 인터페이스를 구현한 클래스이다. MyBatis를 통해 데이터베이스에 저장하는 데 사용된다.
장점
단점
SqlSessionTemplate
: MyBatis의 SqlSession 생성을 관리하는 템플릿 객체
SqlSessionFactory
:SqlSessionTemplate 생성을 위한 팩토리 객체
StatementId
:실행할 MyBatis SQL Mapper의 Statement ID
ItemToParameterConverter
: 데이터를 매퍼 쿼리로 전달하기 위해 객체를 ParameterMap으로 변환하는 기능을 제공
MyBatisBatchItemWriter
를 사용하여 데이터를 MyBatis를 통해 데이터베이스에 저장하는 과정을 구현해 보자.
MyBatis를 사용하여 데이터를 데이터베이스에 저장하기 위한 MyBatisBatchItemWriter
를 작성한다.
@Bean
public MyBatisBatchItemWriter<Customer> mybatisItemWriter() {
return new MyBatisBatchItemWriterBuilder<Customer>()
.sqlSessionFactory(sqlSessionFactory) // SqlSessionFactory 등록
.statementId("com.example.springbatch.jobs.MyBatisItemReaderWriter.insertCustomers") // Mapper에서 실행할 쿼리 ID
.build();
}
sqlSessionFactory
: MyBatis와 데이터베이스를 연결하는 세션 팩토리.statementId
: MyBatis Mapper XML 파일의 쿼리 ID (insertCustomers)와 연결데이터베이스에 데이터를 저장하기 위한 INSERT 쿼리를 추가하였다.
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.springbatch.jobs.MyBatisItemReader">
<resultMap id="customerResult" type="com.example.springbatch.jobs.models.Customer">
<result property="id" column="id"/>
<result property="name" column="name"/>
<result property="age" column="age"/>
<result property="gender" column="gender"/>
</resultMap>
<!-- customer 테이블에 저장하는 INSERT 쿼리 -->
<insert id="insertCustomers" parameterType="com.example.springbatch.jobs.models.Customer">
INSERT INTO customer(name, age, gender)
VALUES (#{name}, #{age}, #{gender});
</insert>
</mapper>
FlatFileItemReader로 읽어올 CSV 파일을 작성해주었다.
id,name,age,gender
1,person1,22,Female
2,person2,35,Male
3,person3,28,Male
package com.example.springbatch.jobs.MyBatisItemReader;
import com.example.springbatch.jobs.models.Customer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MyBatisBatchWriterJobConfig {
private static final int CHUNK_SIZE = 10;
private static final String JOB_NAME = "MYBATIS_BATCH_JOB";
private final SqlSessionFactory sqlSessionFactory;
@Bean
public FlatFileItemReader<Customer> customerFileReader() {
log.info("Initializing FlatFileItemReader...");
return new FlatFileItemReaderBuilder<Customer>()
.name("customerFileReader")
.resource(new ClassPathResource("static/customer.csv"))
.delimited()
.delimiter(",")
.names("id", "name", "age", "gender")
.linesToSkip(1) // header 건너뛰기 추가해야함
.fieldSetMapper(fields -> {
log.info("Reading fields: {}", fields);
Customer customer = new Customer();
customer.setId(fields.readLong("id"));
customer.setName(fields.readString("name"));
customer.setAge(fields.readInt("age"));
customer.setGender(fields.readString("gender"));
return customer;
})
.build();
}
@Bean
public MyBatisBatchItemWriter<Customer> mybatisItemWriter() {
return new MyBatisBatchItemWriterBuilder<Customer>()
.sqlSessionFactory(sqlSessionFactory)
.statementId("com.example.springbatch.jobs.MyBatisItemReader.insertCustomers")
.build();
}
@Bean
public Step customerBatchStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("customerBatchStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(customerFileReader())
.processor(customer -> {
log.info("Processing customer: {}", customer);
return customer;
})
.writer(mybatisItemWriter())
.build();
}
@Bean
public Job customerBatchJob(Step customerBatchStep, JobRepository jobRepository) {
return new JobBuilder(JOB_NAME, jobRepository)
.incrementer(new RunIdIncrementer())
.start(customerBatchStep)
.build();
}
}
❗
FlatFileItemReader
에서 csv파일을 읽어올 때 linesToSkip(1)를 추가하여 CSV 파일에서 첫 번째 줄(헤더)을 무시하도록 설정하였다❗
application.yml에 실행할 JOB 이름을 지정해주었다.
spring:
batch:
jdbc:
initialize-schema: always
job:
name: MYBATIS_BATCH_JOB
CSV 파일의 데이터가 데이터베이스의 customer 테이블에 저장된 것을 확인할 수 있다.
오늘은 실습을 통해 MyBatisPagingItemReader
로 데이터베이스에서 데이터를 읽어와 파일로 저장하고, MyBatisItemWriter
로 파일 데이터를 데이터베이스에 일괄 저장하는 방법에 대해 알아보았다.
참고글 - [SpringBatch 연재 07] MyBatisPagingItemReader로 DB내용을 읽고, MyBatisItemWriter로 DB에 쓰기
깃허브 - https://github.com/hysong4u/springbatch