이번 글에서는 배치를 통해 DB에서 데이터를 읽어오는 여러 ItemReader들에 대해 알아볼겠습니다. 스프링 배치에서 DB로부터 데이터를 읽어올 때엔 Cursor와 Paging 두 방식이 있어 우선 이것에 대한 이해가 필요합니다.
Cursor 방식은 자신이 데이터를 가져올 때 어디까지 가져오는 지 그 위치를 알고 있습니다. 그리고 기본적으로 1개의 데이터씩 가져옵니다. 그러니까 만약 10개의 데이터를 가지고 오려면 10번의 I/O가 발생하게 되고, 때에 따라서는 10번의 connection의 연결/종료가 발생할 수도 있는 것이죠. 그런데 fetchSize를 지정해주면 이 크기만큼 데이터를 한 번에 가져오게 되고, 만약 10을 지정해주면 한 번에 10개를 가져오게 되는 것이죠. 그리고 다음 지점부터 또 10 개의 데이터를 읽어오게 됩니다. 이 방식은 모든 Cursor 작업이 끝나기 전까지 DB 커넥션은 물고 있기에 ConnectionTime이나 SocketTimeout을 오래 잡아주어야 하고, 또 모든 데이터를 메모리에 올리기 때문에 메모리 사용량이 커지게 됩니다.
Paging 기반으로 데이터를 읽을 땐 offset과 limit을 주어 어디서부터 몇 개의 데이터를 DB로 부터 가져올 지 설정합니다. 해당 쿼리가 실행될 동안만 DB 커넥션을 물고 있기 때문에 짧은 시간 내에 DB 커넥션을 반환하고, Paging 단위의 결과만 가져오기 때문에 메모리 공간을 효율적으로 사용합니다.
Cursor 기반의 ItemReader의 경우 JdbcCursorItemReader와 JpaCursorItemReader 두 가지가 있습니다. 이 중 JdbcCursorItemReader에 대해 살펴보겠습니다.
JdbcCursorItemReader는 jdcb를 이용해 DB와 연동하는 방식입니다. 예전에 본 connection, preparedStatement, resultSet을 사용하는 방식이죠. 내부적으로 ItemStream을 사용합니다. 코드로 볼텐데요, 역시 빌더를 사용해 간단하게 구현할 수 있습니다. 주의할 점은 fetchSize() API를 통해 한 번에 가져올 데이터의 수와 chunk() API에서 지정한 사이즈를 동일하게 맞추는 게 좋습니다.
@Configuration
@RequiredArgsConstructor
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private final String sql1 = "SELECT * FROM customer WHERE firstName like ? ORDER BY lastName, firstName";
private final String sql2 = "SELECT * FROM customer ORDER BY lastName, firstName";
private final String arg = "A%";
@Bean
public Job batchjob() {
return jobBuilderFactory.get("batchjob")
.start(step1())
.build();
}
@Bean
public ItemReader<Customer> jdcbCursorItemReader() {
return new JdbcCursorItemReaderBuilder<Customer>()
.name("jdcbCursorItemReader")
.fetchSize(10) // 한 번에 가져올 데이터 수
.sql(sql1) // 실행할 sql문
.beanRowMapper(Customer.class) // 변환할 클래스
.queryArguments(arg) // sql문 내 인자
.dataSource(dataSource) // DB 설정
.build();
}
@Bean
public ItemWriter<Customer> itemWriter() {
return new CustomItemWriter();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(jdcbCursorItemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
}
이제 JpaCursorItemReader에 대해 살펴볼텐데요, JpaCursorItemReader는 조금 더 단순합니다. DataSource가 아닌 EntityManager를 설정해주는 차이가 있습니다. 또한 별도로 fetchSize를 설정하지 않고 Step에서 설정한 chunkSize를 기준으로 처리합니다. 코드로 보겠습니다.
@Configuration
@RequiredArgsConstructor
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private final String sql3 = "SELECT c FROM Customer c WHERE firstName like :firstName ORDER BY lastName, firstName";
private final String sql4 = "SELECT c FROM Customer c";
@Bean
public Job batchjob() {
return jobBuilderFactory.get("batchjob")
.start(step1())
.build();
}
@Bean
public ItemReader<Customer> jpaCursorItemReader() {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("firstName", "A%");
return new JpaCursorItemReaderBuilder<Customer>()
.name("jpaCursorItemReader")
.entityManagerFactory(entityManagerFactory) // EntityManager 설정
.queryString(sql3) // 실행할 jpql문
.parameterValues(parameters) // jpql문 내 인자
.build();
}
@Bean
public ItemWriter<Customer> itemWriter() {
return new CustomItemWriter();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(jpaCursorItemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
}
이제 Paging 기반의 ItemReader를 살펴볼텐데요, 종류는 위와 동일하게 JdbcPagingItemReader, JpaPagingItemReader 두 가지가 있습니다. 이 중 JdbcPagingItemReader를 먼저 살펴보겠습니다. pageSize를 설정해주고, 매핑할 클래스를 설정해준 다음 PagingQueryProvider를 설정해줍니다. 간단하게 코드만 보겠습니다.
@Configuration
@RequiredArgsConstructor
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
public Job batchjob() {
return jobBuilderFactory.get("batchjob")
.start(step1())
.build();
}
@Bean
public ItemReader<Customer> jdbcPagingItemReader() throws Exception {
return new JdbcPagingItemReaderBuilder<Customer>()
.name("jdcbPagingItemReader")
.pageSize(10) // 사이즈 설정
.dataSource(dataSource) // DB 설정
.rowMapper(new BeanPropertyRowMapper<>(Customer.class)) // 매핑할 클래스 설정
.queryProvider(pagingQueryProvider()) // PagingQueryProvider 설정
.build();
}
@Bean
public PagingQueryProvider pagingQueryProvider() throws Exception {
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause("*");
queryProvider.setFromClause("FREOM customer");
queryProvider.setWhereClause("WHERE firstName like :firstName");
queryProvider.setSortKeys(sortKeys); // 정렬 설정
return queryProvider.getObject();
}
@Bean
public ItemWriter<Customer> itemWriter() {
return new CustomItemWriter();
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(jdbcPagingItemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
}
이제 JpaPagingItemReader에 대해 살펴볼텐데요, 이 역시 위에서 본 JpaCursorItemReader와 비슷합니다. 단지 Paging 방식이기 때문에 가져올 데이터의 크기를 지정해주어야 합니다.
@Configuration
@RequiredArgsConstructor
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private final String sql3 = "SELECT c FROM Customer c WHERE firstName like :firstName ORDER BY lastName, firstName";
private final String sql4 = "SELECT c FROM Customer c";
@Bean
public Job batchjob() {
return jobBuilderFactory.get("batchjob")
.start(step1())
.build();
}
@Bean
public ItemReader<Customer> jpaPagingItemReader() {
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("firstName", "A%");
return new JpaPagingItemReaderBuilder<Customer>()
.name("jpaCursorItemReader")
.pageSize(10) // 가져올 데이터 갯수
.entityManagerFactory(entityManagerFactory) // EntityManager 설정
.queryString(sql3) // 실행할 jpql문
.parameterValues(parameters) // jpql문 내 인자
.build();
}
@Bean
public ItemWriter<Customer> itemWriter() {
return new CustomItemWriter();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(jpaPagingItemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
}
이번 글에서는 DB에서 데이터를 읽어오는 ItemReader의 4가지 구현체들을 알아봤습니다. 다음 글에서는 ItemWriter의 구현체들에 대해 알아보겠습니다.