
이전글 - [Spring Batch 4편] FlatFileItemReader, FlatFileItemWriter로 파일 읽고 써보기
오늘은 JdbcPagingItemReader로 파일을 일고 JdbcBatchItemWriter로 파일 쓰는 실습을 진행해보자
실습 1. JdbcPagingItemReader로 DB 읽기
JdbcPagingItemReader를 사용하여 데이터베이스에서 데이터를 페이지 단위로 읽고, 읽어온 데이터를 csv파일로 저장하기
Spring Batch에서 제공하는 ItemReader로, 데이터베이스로부터 데이터를 페이지 단위로 읽어오는 데 사용된다.
DataSource: 데이터베이스 연결 정보 설정
SqlQuery: 데이터를 읽어올 SQL 쿼리를 설정
RowMapper: SQL 쿼리 결과를 특정 Item 객체로 변환
PageSize: 페이지 크기를 설정하여 한 번에 읽어올 데이터 양을 지정
SkippableItemReader: 오류 발생 시 해당 Item을 건너뜀
ReadListener: 읽기 시작, 종료, 오류 발생 등의 이벤트 처리
SaveStateCallback: 잡이 중단된 경우 현재 상태를 저장하여, 재시작 시 이어서 처리
데이터베이스에서 읽어온 데이터를 저장할 모델 클래스 (Customer 클래스)를 작성해주었다.
@Getter
@Setter
public class Customer {
private String name;
private int age;
private String gender;
}
JdbcPagingReaderJobConfig Class안에 데이터를 읽어오기 위한 SQL 쿼리를 정의하는 PagingQueryProvider를 작성해주었다.
@Autowired
DataSource dataSource;
@Bean
public PagingQueryProvider queryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource); // 데이터 소스를 설정하여 DB에 맞는 PagingQueryProvider 선택
queryProvider.setSelectClause("id, name, age, gender"); // 조회할 필드 지정
queryProvider.setFromClause("from customer"); // 조회할 테이블 설정
queryProvider.setWhereClause("where age >= :age"); // 조건절 설정
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.DESCENDING); // 정렬 기준 설정
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
}
setDataSource: 데이터 소스를 설정하여 데이터베이스와의 연결을 관리한다.setSelectClause: SELECT 구문에서 조회할 필드(id, name, age, gender)를 지정한다.setFromClause: 데이터를 가져올 테이블(customer)을 설정한다.setWhereClause: 특정 조건(age >= 20)을 통해 데이터를 필터링한다.setSortKeys: 정렬 기준(id 내림차순)을 지정하여 데이터의 정렬 방식을 설정한다.
JdbcPagingItemReader는 데이터베이스로부터 데이터를 페이지 단위로 읽어오는 역할을 한다. JdbcPagingReaderJobConfig Class 안에 JdbcPagingItemReader를 작성해주었다.
@Bean
public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception {
Map<String, Object> parameterValue = new HashMap<>();
parameterValue.put("age", 20);
return new JdbcPagingItemReaderBuilder<Customer>()
.name("jdbcPagingItemReader")
.fetchSize(CHUNK_SIZE)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.queryProvider(queryProvider())
.parameterValues(parameterValue)
.build();
}
- name: ItemReader 이름 설정
- fetchSize: 한 번에 읽어올 데이터의 개수를 설정하여 성능 최적화
- dataSource: DataSource를 설정하여 데이터베이스와의 연결 관리
- rowMapper: SQL 쿼리 결과를 Customer 객체로 변환하기 위해BeanPropertyRowMapper 사용
- queryProvider: PagingQueryProvider를 설정하여 페이지 단위의 데이터를 가져오는 쿼리 제공
- parameterValues: 쿼리에서 사용할 파라미터 값 설정
배치를 실행하기위한 기본적인 Job과 Step 설정이 추가된 전체코드이다.
package com.example.springbatch.jobs.JdbcPagingItemReader;
import com.example.springbatch.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class JdbcPagingReaderJobConfig {
public static final int CHUNK_SIZE = 2;
public static final String ENCODING = "UTF-8";
public static final String JDBC_PAGING_CHUNK_JOB = "JDBC_PAGING_CHUNK_JOB";
@Autowired
DataSource dataSource;
@Bean
public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception {
Map<String, Object> parameterValue = new HashMap<>();
parameterValue.put("age", 20);
return new JdbcPagingItemReaderBuilder<Customer>()
.name("jdbcPagingItemReader")
.fetchSize(CHUNK_SIZE)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.queryProvider(queryProvider())
.parameterValues(parameterValue)
.build();
}
@Bean
public PagingQueryProvider queryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource); // 데이터 소스를 설정하여 DB에 맞는 PagingQueryProvider 선택
queryProvider.setSelectClause("id, name, age, gender"); // 조회할 필드 지정
queryProvider.setFromClause("from customer"); // 조회할 테이블 설정
queryProvider.setWhereClause("where age >= :age"); // 조건절 설정
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.DESCENDING); // 정렬 기준 설정
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
@Bean
public FlatFileItemWriter<Customer> customerFlatFileItemWriter() {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerFlatFileItemWriter")
.resource(new FileSystemResource("./output/customer_new_v1.csv")) //출력 경로 지정
.encoding(ENCODING)
.delimited().delimiter("\t")
.names("Name", "Age", "Gender")
.build();
}
@Bean
public Step customerJdbcPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
log.info("------------------ Init customerJdbcPagingStep -----------------");
return new StepBuilder("customerJdbcPagingStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(jdbcPagingItemReader())
.writer(customerFlatFileItemWriter())
.build();
}
@Bean
public Job customerJdbcPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
log.info("------------------ Init customerJdbcPagingJob -----------------");
return new JobBuilder(JDBC_PAGING_CHUNK_JOB, jobRepository) //잡이름
.incrementer(new RunIdIncrementer())
.start(customerJdbcPagingStep)
.build();
}
}
이제 구현한 JdbcPagingItemReader를 실행하기위한 설정을 추가해보자.
하나의 프로젝트 안에 여러 Job을 구현한 경우, Spring Batch는 실행할 Job을 명확하게 지정해야 한다.
❗그렇지 않으면 Job name must be specified in case of multiple jobs 에러 메시지와 함께 에러가 발생하게 된다.❗
이를 해결하기 위해 application.yml 파일에 실행할 Job의 이름을 명시적으로 지정할 수 있다.
spring:
batch:
job:
name: JDBC_PAGING_CHUNK_JOB
enabled: true
아래와 같이 설정을 추가하여 위에서 작성한 JDBC_PAGING_CHUNK_JOB이라는 Job을 실행하도록 한다.
customer 테이블을 데이터베이스에 생성하여, JdbcPagingItemReader가 읽을 수 있는 데이터 구조를 만든다.
CREATE TABLE customer (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50),
age INT,
gender VARCHAR(10)
);
테이블을 생성한 후, 아래와 QueryConsole에 아래 내용을 입력하여 샘플 데이터를 삽입하였다.
INSERT INTO customer (name, age, gender) VALUES ('person1', 22, 'Female');
INSERT INTO customer (name, age, gender) VALUES ('person2', 35, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person3', 28, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person4', 17, 'Female');
INSERT INTO customer (name, age, gender) VALUES ('person5', 35, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person6', 56, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person7', 22, 'Female');
INSERT INTO customer (name, age, gender) VALUES ('person8', 11, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person9', 7, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('hayeon', 24, 'Female');

스프링부트 어플리케이션을 실행하면 우리가 설정한 출력 경로 ./output/customer_new_v1.csv에 출력 파일이 생성된 것을 확인할 수 있다.

customer_new_v1.csv 파일을 보면 Customer 테이블의 데이터가 저장되어있고 위 QueryProvider에서 설정한 조건(age >= 20, id 필드기준으로 내림차순 정렬)이 잘 적용되어 출력된 것을 확인할 수 있다.

방금 실행한 JOB이 잘 돌아갔나 확인하기 위해 Spring Batch의 메타데이터 스키마
BATCH_STEP_EXECUTION테이블을 확인해보자
✅CHUNK_SIZE를 2로 지정했기 때문에, JdbcPagingItemReader는 데이터를 한 번에 2개씩 읽어올 수 있다.
✅ BATCH_STEP_EXECUTION 테이블에서 내가 실행한 JOB의 READ_COUNT가 7로 표시된 것은 총 7개의 데이터를 읽었음을 확인할 수 있다.
✅ COMMIT_COUNT가 4로 나타난 것은 7개의 데이터를 2개씩 3번, 나머지 1개를 마지막으로 한 번 더 커밋하여 총 4번에 걸쳐 데이터를 처리했음을 확인할 수 있다.

실습 2. JdbcBatchItemWriter로 DB에 쓰기
JdbcBatchItemWriter를 이용하여 CSV 파일로부터 데이터를 읽어와 데이터베이스에 insert하기
JdbcBatchItemWriter는 Spring Batch에서 데이터베이스에 데이터를 저장하는 ItemWriter 구현체로 대량의 데이터를 JDBC를 통해 효율적으로 저장할 때 주로 사용된다.
DataSource: 데이터베이스 연결을 위한 정보 제공SqlStatementCreator: 데이터 저장 시 필요한 INSERT 쿼리를 생성PreparedStatementSetter: INSERT 쿼리에 필요한 파라미터 값을 설정ItemSqlParameterSourceProvider: Customer와 같은 객체의 필드를 쿼리에 전달할 파라미터로 변환장점
단점
JdbcBatchItemWriter는 데이터를 데이터베이스 테이블에 일괄적으로 저장하는 데 사용한다.
아래 코드 에서는 customer 테이블에 Customer 객체의 데이터를 저장하도록 한다.
JdbcBatchItemJobConfig Class 안에 JdbcBatchItemWriter를 작성해주었다.
@Bean
public JdbcBatchItemWriter<Customer> jdbcBatchItemWriter() {
return new JdbcBatchItemWriterBuilder<Customer>()
.dataSource(dataSource)
.sql("INSERT INTO customer (nam
e, age, gender) VALUES (:name, :age, :gender)")
.itemSqlParameterSourceProvider(new CustomerItemSqlParameterSourceProvider())
.build();
}
- dataSource: 데이터베이스에 연결하기 위한 정보 지정
- sql: 데이터를 삽입할 SQL 구문을 작성. :name, :age, :gender는 Customer 객체의 필드 값으로 매핑됨
- itemSqlParameterSourceProvider: Customer 객체의 필드를 SQL 파라미터로 변환하는 역할을 하는 클래스(
CustomerItemSqlParameterSourceProvider) 설정
CustomerItemSqlParameterSourceProvider는 Customer 객체의 필드를 SQL 쿼리의 파라미터로 변환해주는 역할을 한다.
CustomerItemSqlParameterSourceProvider Class 클래스를 새로 생성해 작성해주었다.
public class CustomerItemSqlParameterSourceProvider implements ItemSqlParameterSourceProvider<Customer> {
@Override
public SqlParameterSource createSqlParameterSource(Customer item) {
return new BeanPropertySqlParameterSource(item);
}
}
배치를 실행하기위한 기본적인 Job과 Step 설정이 추가된 전체코드이다.
package com.example.springbatch.jobs.JdbcPagingItemReader;
import com.example.springbatch.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Slf4j
@Configuration
public class JdbcBatchItemJobConfig {
public static final int CHUNK_SIZE = 100;
public static final String ENCODING = "UTF-8";
public static final String JDBC_BATCH_WRITER_CHUNK_JOB = "JDBC_BATCH_WRITER_CHUNK_JOB";
@Autowired
DataSource dataSource;
@Bean
public FlatFileItemReader<Customer> flatFileItemReader2() {
return new FlatFileItemReaderBuilder<Customer>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("static/customer.csv")) //입력 경로
.encoding(ENCODING)
.delimited().delimiter(",")
.names("name", "age", "gender")
.targetType(Customer.class)
.build();
}
@Bean
public JdbcBatchItemWriter<Customer> jdbcBatchItemWriter() {
return new JdbcBatchItemWriterBuilder<Customer>()
.dataSource(dataSource)
.sql("INSERT INTO customer (name, age, gender) VALUES (:name, :age, :gender)")
.itemSqlParameterSourceProvider(new CustomerItemSqlParameterSourceProvider())
.build();
}
@Bean
public Step customerJdbcBatchingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
log.info("------------------ Init customerJdbcPagingStep -----------------");
return new StepBuilder("customerJdbcBatchingStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(flatFileItemReader2())
.writer(jdbcBatchItemWriter())
.build();
}
@Bean
public Job customerJdbcBatchingJob(Step customerJdbcBatchingStep, JobRepository jobRepository) {
log.info("------------------ Init customerJdbcBatchingStep -----------------");
return new JobBuilder(JDBC_BATCH_WRITER_CHUNK_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(customerJdbcBatchingStep)
.build();
}
}
application.yml 파일에 실행할 Job의 이름을 JDBC_BATCH_WRITER_CHUNK_JOB로 지정해주었다.
spring:
batch:
job:
name: JDBC_BATCH_WRITER_CHUNK_JOB
resources/static 디렉토리에 customer.csv파일을 추가해주었다.
unclebae,40,Male
superman,45,Male
WonderWoman,30,Female
스프링부트를 실행하면 customer 테이블에 새로운 데이터가 추가되었다.
기존 데이터 위에 CSV 파일로부터 읽어온 데이터가 정상적으로 삽입되었음을 확인할 수 있다.


오늘은 실습을 통해 JdbcPagingItemReader로 데이터베이스에서 데이터를 읽어와 파일로 저장하고, JdbcBatchItemWriter로 파일 데이터를 데이터베이스에 일괄 저장하는 방법에 대해 알아보았다.
참고글 - [SpringBatch 연재 05] JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기
깃허브 - https://github.com/hysong4u/springbatch
고생많았어요 ㅠㅠ 하연님이 발표했으면 더 깔끔했을것 같아요 ㅠㅠ