이전글 - [Spring Batch 4편] FlatFileItemReader, FlatFileItemWriter로 파일 읽고 써보기
오늘은 JdbcPagingItemReader로 파일을 일고 JdbcBatchItemWriter로 파일 쓰는 실습을 진행해보자
실습 1. JdbcPagingItemReader로 DB 읽기
JdbcPagingItemReader
를 사용하여 데이터베이스에서 데이터를 페이지 단위로 읽고, 읽어온 데이터를 csv파일로 저장하기
Spring Batch에서 제공하는 ItemReader로, 데이터베이스로부터 데이터를 페이지 단위로 읽어오는 데 사용된다.
DataSource
: 데이터베이스 연결 정보 설정
SqlQuery
: 데이터를 읽어올 SQL 쿼리를 설정
RowMapper
: SQL 쿼리 결과를 특정 Item 객체로 변환
PageSize
: 페이지 크기를 설정하여 한 번에 읽어올 데이터 양을 지정
SkippableItemReader
: 오류 발생 시 해당 Item을 건너뜀
ReadListener
: 읽기 시작, 종료, 오류 발생 등의 이벤트 처리
SaveStateCallback
: 잡이 중단된 경우 현재 상태를 저장하여, 재시작 시 이어서 처리
데이터베이스에서 읽어온 데이터를 저장할 모델 클래스 (Customer 클래스
)를 작성해주었다.
@Getter
@Setter
public class Customer {
private String name;
private int age;
private String gender;
}
JdbcPagingReaderJobConfig
Class안에 데이터를 읽어오기 위한 SQL 쿼리를 정의하는 PagingQueryProvider
를 작성해주었다.
@Autowired
DataSource dataSource;
@Bean
public PagingQueryProvider queryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource); // 데이터 소스를 설정하여 DB에 맞는 PagingQueryProvider 선택
queryProvider.setSelectClause("id, name, age, gender"); // 조회할 필드 지정
queryProvider.setFromClause("from customer"); // 조회할 테이블 설정
queryProvider.setWhereClause("where age >= :age"); // 조건절 설정
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.DESCENDING); // 정렬 기준 설정
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
}
setDataSource
: 데이터 소스를 설정하여 데이터베이스와의 연결을 관리한다.setSelectClause
: SELECT 구문에서 조회할 필드(id, name, age, gender)를 지정한다.setFromClause
: 데이터를 가져올 테이블(customer)을 설정한다.setWhereClause
: 특정 조건(age >= 20)을 통해 데이터를 필터링한다.setSortKeys
: 정렬 기준(id 내림차순)을 지정하여 데이터의 정렬 방식을 설정한다.
JdbcPagingItemReader는 데이터베이스로부터 데이터를 페이지 단위로 읽어오는 역할을 한다. JdbcPagingReaderJobConfig
Class 안에 JdbcPagingItemReader
를 작성해주었다.
@Bean
public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception {
Map<String, Object> parameterValue = new HashMap<>();
parameterValue.put("age", 20);
return new JdbcPagingItemReaderBuilder<Customer>()
.name("jdbcPagingItemReader")
.fetchSize(CHUNK_SIZE)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.queryProvider(queryProvider())
.parameterValues(parameterValue)
.build();
}
- name: ItemReader 이름 설정
- fetchSize: 한 번에 읽어올 데이터의 개수를 설정하여 성능 최적화
- dataSource: DataSource를 설정하여 데이터베이스와의 연결 관리
- rowMapper: SQL 쿼리 결과를 Customer 객체로 변환하기 위해BeanPropertyRowMapper 사용
- queryProvider: PagingQueryProvider를 설정하여 페이지 단위의 데이터를 가져오는 쿼리 제공
- parameterValues: 쿼리에서 사용할 파라미터 값 설정
배치를 실행하기위한 기본적인 Job과 Step 설정이 추가된 전체코드이다.
package com.example.springbatch.jobs.JdbcPagingItemReader;
import com.example.springbatch.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class JdbcPagingReaderJobConfig {
public static final int CHUNK_SIZE = 2;
public static final String ENCODING = "UTF-8";
public static final String JDBC_PAGING_CHUNK_JOB = "JDBC_PAGING_CHUNK_JOB";
@Autowired
DataSource dataSource;
@Bean
public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception {
Map<String, Object> parameterValue = new HashMap<>();
parameterValue.put("age", 20);
return new JdbcPagingItemReaderBuilder<Customer>()
.name("jdbcPagingItemReader")
.fetchSize(CHUNK_SIZE)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.queryProvider(queryProvider())
.parameterValues(parameterValue)
.build();
}
@Bean
public PagingQueryProvider queryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource); // 데이터 소스를 설정하여 DB에 맞는 PagingQueryProvider 선택
queryProvider.setSelectClause("id, name, age, gender"); // 조회할 필드 지정
queryProvider.setFromClause("from customer"); // 조회할 테이블 설정
queryProvider.setWhereClause("where age >= :age"); // 조건절 설정
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.DESCENDING); // 정렬 기준 설정
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
@Bean
public FlatFileItemWriter<Customer> customerFlatFileItemWriter() {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerFlatFileItemWriter")
.resource(new FileSystemResource("./output/customer_new_v1.csv")) //출력 경로 지정
.encoding(ENCODING)
.delimited().delimiter("\t")
.names("Name", "Age", "Gender")
.build();
}
@Bean
public Step customerJdbcPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
log.info("------------------ Init customerJdbcPagingStep -----------------");
return new StepBuilder("customerJdbcPagingStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(jdbcPagingItemReader())
.writer(customerFlatFileItemWriter())
.build();
}
@Bean
public Job customerJdbcPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
log.info("------------------ Init customerJdbcPagingJob -----------------");
return new JobBuilder(JDBC_PAGING_CHUNK_JOB, jobRepository) //잡이름
.incrementer(new RunIdIncrementer())
.start(customerJdbcPagingStep)
.build();
}
}
이제 구현한 JdbcPagingItemReader를 실행하기위한 설정을 추가해보자.
하나의 프로젝트 안에 여러 Job
을 구현한 경우, Spring Batch는 실행할 Job
을 명확하게 지정해야 한다.
❗그렇지 않으면 Job name must be specified in case of multiple jobs
에러 메시지와 함께 에러가 발생하게 된다.❗
이를 해결하기 위해 application.yml 파일에 실행할 Job의 이름을 명시적으로 지정할 수 있다.
spring:
batch:
job:
name: JDBC_PAGING_CHUNK_JOB
enabled: true
아래와 같이 설정을 추가하여 위에서 작성한 JDBC_PAGING_CHUNK_JOB
이라는 Job을 실행하도록 한다.
customer
테이블을 데이터베이스에 생성하여, JdbcPagingItemReader
가 읽을 수 있는 데이터 구조를 만든다.
CREATE TABLE customer (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50),
age INT,
gender VARCHAR(10)
);
테이블을 생성한 후, 아래와 QueryConsole에 아래 내용을 입력하여 샘플 데이터를 삽입하였다.
INSERT INTO customer (name, age, gender) VALUES ('person1', 22, 'Female');
INSERT INTO customer (name, age, gender) VALUES ('person2', 35, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person3', 28, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person4', 17, 'Female');
INSERT INTO customer (name, age, gender) VALUES ('person5', 35, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person6', 56, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person7', 22, 'Female');
INSERT INTO customer (name, age, gender) VALUES ('person8', 11, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person9', 7, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('hayeon', 24, 'Female');
스프링부트 어플리케이션을 실행하면 우리가 설정한 출력 경로 ./output/customer_new_v1.csv
에 출력 파일이 생성된 것을 확인할 수 있다.
customer_new_v1.csv
파일을 보면 Customer 테이블의 데이터가 저장되어있고 위 QueryProvider
에서 설정한 조건(age >= 20, id 필드기준으로 내림차순 정렬)이 잘 적용되어 출력된 것을 확인할 수 있다.
방금 실행한 JOB이 잘 돌아갔나 확인하기 위해 Spring Batch의 메타데이터 스키마
BATCH_STEP_EXECUTION
테이블을 확인해보자
✅CHUNK_SIZE
를 2로 지정했기 때문에, JdbcPagingItemReader
는 데이터를 한 번에 2개씩 읽어올 수 있다.
✅ BATCH_STEP_EXECUTION
테이블에서 내가 실행한 JOB의 READ_COUNT
가 7로 표시된 것은 총 7개의 데이터를 읽었음을 확인할 수 있다.
✅ COMMIT_COUNT
가 4로 나타난 것은 7개의 데이터를 2개씩 3번, 나머지 1개를 마지막으로 한 번 더 커밋하여 총 4번에 걸쳐 데이터를 처리했음을 확인할 수 있다.
실습 2. JdbcBatchItemWriter로 DB에 쓰기
JdbcBatchItemWriter
를 이용하여 CSV 파일로부터 데이터를 읽어와 데이터베이스에 insert하기
JdbcBatchItemWriter
는 Spring Batch에서 데이터베이스에 데이터를 저장하는 ItemWriter 구현체로 대량의 데이터를 JDBC를 통해 효율적으로 저장할 때 주로 사용된다.
DataSource
: 데이터베이스 연결을 위한 정보 제공SqlStatementCreator
: 데이터 저장 시 필요한 INSERT 쿼리를 생성PreparedStatementSetter
: INSERT 쿼리에 필요한 파라미터 값을 설정ItemSqlParameterSourceProvider
: Customer와 같은 객체의 필드를 쿼리에 전달할 파라미터로 변환장점
단점
JdbcBatchItemWriter
는 데이터를 데이터베이스 테이블에 일괄적으로 저장하는 데 사용한다.
아래 코드 에서는 customer
테이블에 Customer
객체의 데이터를 저장하도록 한다.
JdbcBatchItemJobConfig
Class 안에 JdbcBatchItemWriter
를 작성해주었다.
@Bean
public JdbcBatchItemWriter<Customer> jdbcBatchItemWriter() {
return new JdbcBatchItemWriterBuilder<Customer>()
.dataSource(dataSource)
.sql("INSERT INTO customer (nam![](https://velog.velcdn.com/images/hanni/post/3536ed28-3463-4df0-8a99-2ede702dc2dc/image.png)
e, age, gender) VALUES (:name, :age, :gender)")
.itemSqlParameterSourceProvider(new CustomerItemSqlParameterSourceProvider())
.build();
}
- dataSource: 데이터베이스에 연결하기 위한 정보 지정
- sql: 데이터를 삽입할 SQL 구문을 작성. :name, :age, :gender는 Customer 객체의 필드 값으로 매핑됨
- itemSqlParameterSourceProvider: Customer 객체의 필드를 SQL 파라미터로 변환하는 역할을 하는 클래스(
CustomerItemSqlParameterSourceProvider
) 설정
CustomerItemSqlParameterSourceProvider
는 Customer 객체의 필드를 SQL 쿼리의 파라미터로 변환해주는 역할을 한다.
CustomerItemSqlParameterSourceProvider
Class 클래스를 새로 생성해 작성해주었다.
public class CustomerItemSqlParameterSourceProvider implements ItemSqlParameterSourceProvider<Customer> {
@Override
public SqlParameterSource createSqlParameterSource(Customer item) {
return new BeanPropertySqlParameterSource(item);
}
}
배치를 실행하기위한 기본적인 Job과 Step 설정이 추가된 전체코드이다.
package com.example.springbatch.jobs.JdbcPagingItemReader;
import com.example.springbatch.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Slf4j
@Configuration
public class JdbcBatchItemJobConfig {
public static final int CHUNK_SIZE = 100;
public static final String ENCODING = "UTF-8";
public static final String JDBC_BATCH_WRITER_CHUNK_JOB = "JDBC_BATCH_WRITER_CHUNK_JOB";
@Autowired
DataSource dataSource;
@Bean
public FlatFileItemReader<Customer> flatFileItemReader2() {
return new FlatFileItemReaderBuilder<Customer>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("static/customer.csv")) //입력 경로
.encoding(ENCODING)
.delimited().delimiter(",")
.names("name", "age", "gender")
.targetType(Customer.class)
.build();
}
@Bean
public JdbcBatchItemWriter<Customer> jdbcBatchItemWriter() {
return new JdbcBatchItemWriterBuilder<Customer>()
.dataSource(dataSource)
.sql("INSERT INTO customer (name, age, gender) VALUES (:name, :age, :gender)")
.itemSqlParameterSourceProvider(new CustomerItemSqlParameterSourceProvider())
.build();
}
@Bean
public Step customerJdbcBatchingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
log.info("------------------ Init customerJdbcPagingStep -----------------");
return new StepBuilder("customerJdbcBatchingStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(flatFileItemReader2())
.writer(jdbcBatchItemWriter())
.build();
}
@Bean
public Job customerJdbcBatchingJob(Step customerJdbcBatchingStep, JobRepository jobRepository) {
log.info("------------------ Init customerJdbcBatchingStep -----------------");
return new JobBuilder(JDBC_BATCH_WRITER_CHUNK_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(customerJdbcBatchingStep)
.build();
}
}
application.yml 파일에 실행할 Job의 이름을 JDBC_BATCH_WRITER_CHUNK_JOB
로 지정해주었다.
spring:
batch:
job:
name: JDBC_BATCH_WRITER_CHUNK_JOB
resources/static
디렉토리에 customer.csv
파일을 추가해주었다.
unclebae,40,Male
superman,45,Male
WonderWoman,30,Female
스프링부트를 실행하면 customer 테이블에 새로운 데이터가 추가되었다.
기존 데이터 위에 CSV 파일로부터 읽어온 데이터가 정상적으로 삽입되었음을 확인할 수 있다.
오늘은 실습을 통해 JdbcPagingItemReader
로 데이터베이스에서 데이터를 읽어와 파일로 저장하고, JdbcBatchItemWriter
로 파일 데이터를 데이터베이스에 일괄 저장하는 방법에 대해 알아보았다.
참고글 - [SpringBatch 연재 05] JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기
깃허브 - https://github.com/hysong4u/springbatch
고생많았어요 ㅠㅠ 하연님이 발표했으면 더 깔끔했을것 같아요 ㅠㅠ