Cursor 방식은 DB와 커넥션을 맺은 후, Cursot를 한칸씩 옮기면서 지속적으로 데이터를 가져옵니다. DB와 어플리케이션 사이 통로를 하나 연결해서 하나씩 데이터를 가져온다고 생각하면 됩니다.
아래 예제코드는 jojoldu님의 블로그의 스프링 배치 글을 보고 따라서 작성해본 예제코드 입니다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcCursorItemReaderJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
private static final int chunkSize = 10;
@Bean
public Job jobCursorItemReaderJob() {
return jobBuilderFactory.get("jdbcCursorItemReaderJob")
.start(jdbcCursorItemReaderStep())
.build();
}
// ItemReader에서 반환하는 타입은 Pay, Writer에서 저장하는 타입도 Pay입니다.
@Bean
public Step jdbcCursorItemReaderStep() {
return stepBuilderFactory.get("jdbcCursorItemReaderStep")
.<Pay, Pay> chunk(chunkSize)
.reader(jdbcCursorItemReader())
.writer(jdbcCursorItemWriter())
.build();
}
@Bean
public JdbcCursorItemReader<Pay> jdbcCursorItemReader() {
return new JdbcCursorItemReaderBuilder<Pay>()
.fetchSize(chunkSize)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Pay.class))
.sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
.name("jdbcCursorItemReader")
.build();
}
@Bean
public ItemWriter<Pay> jdbcCursorItemWriter() {
return list -> {
for (Pay pay: list) {
log.info("Current Pay= {}", pay);
}
};
}
}
@ToString
@Setter
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Entity
public class Pay {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long amount;
private String txName;
private LocalDateTime txDateTime;
private Pay(Long amount, String txName, LocalDateTime txDateTime) {
this.amount = amount;
this.txName = txName;
this.txDateTime = txDateTime;
}
private Pay(Long id, Long amount, String txName, LocalDateTime txDateTime) {
this.id = id;
this.amount = amount;
this.txName = txName;
this.txDateTime = txDateTime;
}
public static Pay of(Long amount, String txName, LocalDateTime txDateTime) {
return new Pay(amount, txName, txDateTime);
}
}
실제로 AbstractItemCountingItemStreamItemReader.read() 메서드를 호출하면 하위 추상 클래스인 AbstractCursorItemReader.doRead() 메서드가 호출 됩니다.
@Nullable
@Override
protected T doRead() throws Exception {
if (rs == null) {
throw new ReaderNotOpenException("Reader must be open before it can be read.");
}
try {
if (!rs.next()) {
return null;
}
int currentRow = getCurrentItemCount();
T item = readCursor(rs, currentRow);
verifyCursorPosition(currentRow);
return item;
}
catch (SQLException se) {
throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
}
}
템플릿 메서드 패턴을 사용하기 때문에 readCursor()
메서드는 실제로 비어있고, 하위 클래스인 JdbcCursorItemReader에서 구현하고 있습니다. 하나씩 DB에서 ResultSet을 open해서 읽어오고 있습니다.
@Nullable
@Override
protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
return rowMapper.mapRow(rs, currentRow);
}
JdbcCursorItemReaderBuilder를 통해서 JdbcCursorItemReader 구현체를 생성하기 위해 다양한 메서드를 호출하고 있는데 하나씩 살펴보겠습니다. 각 역할들을 살펴보겠습니다.
ResultSet의 동작 과정을 알아야하는데 최초로 ResultSet.next() 메서드를 호출 시 한 꺼번에 fetchSize 만큼 DB에서 가져와 메모리에 저장합니다. 그 다음 read() 메서드로 메모리에서 하나씩 읽어서 처리합니다.
dataSource: Database에 접근하기 위해 사용할 Datasource 객체를 할당합니다.
rowMapper: 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper 입니다. 커스텀하기 보다는 매번 Mapper 클래스를 생성해야 하기 때문에 보편적으로 Spring에서 공식적으로 지원하는 BeanPropertyRowMapper를 많이 사용합니다.
name: ItemReader의 이름을 지정합니다. Bean의 이름이 아니고 Spring Batch의 ExecutionContext에 저장되어질 이름입니다.
ItemWriter는 Chunk 크기의 List 객체를 받아서 간단히 로그만 찍도록 처리하였습니다.
CursorItemReader를 사용할 때는 DB와 SocketTimeout을 충분히 큰 값으로 설정해야 합니다. 기본적으로 TCP 통신은 Socket으로 하기 때문에 타임아웃을 설정해줘야 합니다. Cursor는 하나의 Connection으로 Batch가 끝날때까지 사용되기 때문에 작업이 다 끝나기전에 Connection이 먼저 끊어질 수 있습니다.
Batch 수행 시간이 오래 걸리는 경우에 권장하는 방법은
PagingItemReader를 사용하는 것입니다.
Paging의 경우한 페이지를 읽을 때마다 Connection을 맺고 끊기 때문에 아무리 많은 데이터라도 타임아웃과 부하 없이 수행될 수 있습니다.
Paging 방식은 한번에 10개 혹은 개발자가 지정한 pageSize만큼 데이터를 가져옵니다.
JdbcPagingItemReader는 JdbcCursorItemReader와 같은 JdbcTemplate 인터페이스를 이용한 PagingItemReader 입니다.
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcPagingItemReaderConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource; // DataSource DI
private static final int chunkSize = 10;
@Bean
public Job jdbcPagingItemReaderJob() throws Exception {
return jobBuilderFactory.get("jdbcPagingItemReaderJob")
.start(jdbcPagingItemReaderStep())
.build();
}
@Bean
public Step jdbcPagingItemReaderStep() throws Exception {
return stepBuilderFactory.get("jdbcPagingItemReaderStep")
.<Pay, Pay>chunk(chunkSize)
.reader(jdbcPagingItemReader())
.writer(jdbcPagingItemWriter())
.build();
}
@Bean
public JdbcPagingItemReader<Pay> jdbcPagingItemReader() throws Exception {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("amount", 2000);
return new JdbcPagingItemReaderBuilder<Pay>()
.pageSize(chunkSize)
.fetchSize(chunkSize)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Pay.class))
.queryProvider(createQueryProvider())
.parameterValues(parameterValues)
.name("jdbcPagingItemReader")
.build();
}
private ItemWriter<Pay> jdbcPagingItemWriter() {
return list -> {
for (Pay pay: list) {
log.info("Current Pay={}", pay);
}
};
}
@Bean
public PagingQueryProvider createQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource); // Database에 맞는 PagingQueryProvider를 선택하기 위해
queryProvider.setSelectClause("id, amount, tx_name, tx_date_time");
queryProvider.setFromClause("from pay");
queryProvider.setWhereClause("where amount >= :amount");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
}
JdbcCursorItemReader 클래스와 크게 다른 점은 쿼리를 생성하는 부분입니다. JdbcPagingItemReader 클래스는 PagingQueryProvider
를 통해 쿼리를 생성합니다.
이렇게까지 하는 이유는 각 DB에 Paging을 지원하는 자체적인 전략 때문입니다.
{
providers.put(DB2, new Db2PagingQueryProvider());
providers.put(DB2VSE, new Db2PagingQueryProvider());
providers.put(DB2ZOS, new Db2PagingQueryProvider());
providers.put(DB2AS400, new Db2PagingQueryProvider());
providers.put(DERBY,new DerbyPagingQueryProvider());
providers.put(HSQL,new HsqlPagingQueryProvider());
providers.put(H2,new H2PagingQueryProvider());
providers.put(MYSQL,new MySqlPagingQueryProvider());
providers.put(ORACLE,new OraclePagingQueryProvider());
providers.put(POSTGRES,new PostgresPagingQueryProvider());
providers.put(SQLITE, new SqlitePagingQueryProvider());
providers.put(SQLSERVER,new SqlServerPagingQueryProvider());
providers.put(SYBASE,new SybasePagingQueryProvider());
}
Spring Batch는 SqlPagingQueryProviderFactoryBean을 통해 DataSource 설정 값을 보고 위 코드에서 작성된 Provider 중 하나를 자동으로 선택하도록 합니다. 이렇게 하면 코드 변경 사항이 적어서 Spring Batch에서 공식 지원하는 방법입니다.
그럼 다시 위의 코드들이 무엇을 의미하는지 살펴보겠습니다.
queryProvider.setWhereClause("where amount >= :amount");
where 절에서 선언된 파라미터 변수 명과 parameterValues.put("amount", 2000) 코드에서 key에 해당하는 변수명이 일치해야 합니다.
JdbcPagingItemReader는 추상클래스인 AbstractPagingItemReader를 상속받고 있습니다. Paging 쿼리로 limit, offset 만큼 조회를 하면 AbstractPagingItemReader가 내부적으로 가지고 있는 List<T> results
변수에서 페이지 크기만큼의 row들을 가지고 있다가 read() 메서드 호출 시에 List 내부에서 하나씩 Item을 리턴합니다.
정렬(Order)가 무조건 포함되어 있어야 합니다.
참조 사이트: https://n1tjrgns.tistory.com/159