[ 정수원 스프링 배치 #10 ] - DB ItemReader

정동욱·2023년 10월 24일
0
post-thumbnail

이번 글에서는 배치를 통해 DB에서 데이터를 읽어오는 여러 ItemReader들에 대해 알아볼겠습니다. 스프링 배치에서 DB로부터 데이터를 읽어올 때엔 CursorPaging 두 방식이 있어 우선 이것에 대한 이해가 필요합니다.

Cursor 방식은 자신이 데이터를 가져올 때 어디까지 가져오는 지 그 위치를 알고 있습니다. 그리고 기본적으로 1개의 데이터씩 가져옵니다. 그러니까 만약 10개의 데이터를 가지고 오려면 10번의 I/O가 발생하게 되고, 때에 따라서는 10번의 connection의 연결/종료가 발생할 수도 있는 것이죠. 그런데 fetchSize를 지정해주면 이 크기만큼 데이터를 한 번에 가져오게 되고, 만약 10을 지정해주면 한 번에 10개를 가져오게 되는 것이죠. 그리고 다음 지점부터 또 10 개의 데이터를 읽어오게 됩니다. 이 방식은 모든 Cursor 작업이 끝나기 전까지 DB 커넥션은 물고 있기에 ConnectionTime이나 SocketTimeout을 오래 잡아주어야 하고, 또 모든 데이터를 메모리에 올리기 때문에 메모리 사용량이 커지게 됩니다.

Paging 기반으로 데이터를 읽을 땐 offset과 limit을 주어 어디서부터 몇 개의 데이터를 DB로 부터 가져올 지 설정합니다. 해당 쿼리가 실행될 동안만 DB 커넥션을 물고 있기 때문에 짧은 시간 내에 DB 커넥션을 반환하고, Paging 단위의 결과만 가져오기 때문에 메모리 공간을 효율적으로 사용합니다.

Cursor 기반의 ItemReader의 경우 JdbcCursorItemReaderJpaCursorItemReader 두 가지가 있습니다. 이 중 JdbcCursorItemReader에 대해 살펴보겠습니다.

JdbcCursorItemReader는 jdcb를 이용해 DB와 연동하는 방식입니다. 예전에 본 connection, preparedStatement, resultSet을 사용하는 방식이죠. 내부적으로 ItemStream을 사용합니다. 코드로 볼텐데요, 역시 빌더를 사용해 간단하게 구현할 수 있습니다. 주의할 점은 fetchSize() API를 통해 한 번에 가져올 데이터의 수와 chunk() API에서 지정한 사이즈를 동일하게 맞추는 게 좋습니다.

@Configuration
@RequiredArgsConstructor
public class JobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private final DataSource dataSource;
    private final String sql1 = "SELECT * FROM customer WHERE firstName like ? ORDER BY lastName, firstName";
    private final String sql2 = "SELECT * FROM customer ORDER BY lastName, firstName";
    private final String arg = "A%";

    @Bean
    public Job batchjob() {
        return jobBuilderFactory.get("batchjob")
                .start(step1())
                .build();
    }

    @Bean
    public ItemReader<Customer> jdcbCursorItemReader() {
        return new JdbcCursorItemReaderBuilder<Customer>()
                .name("jdcbCursorItemReader")
                .fetchSize(10) // 한 번에 가져올 데이터 수
                .sql(sql1) // 실행할 sql문
                .beanRowMapper(Customer.class) // 변환할 클래스
                .queryArguments(arg) // sql문 내 인자
                .dataSource(dataSource) // DB 설정
                .build();
    }

    @Bean
    public ItemWriter<Customer> itemWriter() {
        return new CustomItemWriter();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(jdcbCursorItemReader())
                .writer(itemWriter())
                .allowStartIfComplete(true)
                .build();
    }
}

이제 JpaCursorItemReader에 대해 살펴볼텐데요, JpaCursorItemReader는 조금 더 단순합니다. DataSource가 아닌 EntityManager를 설정해주는 차이가 있습니다. 또한 별도로 fetchSize를 설정하지 않고 Step에서 설정한 chunkSize를 기준으로 처리합니다. 코드로 보겠습니다.

@Configuration
@RequiredArgsConstructor
public class JobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    private final String sql3 = "SELECT c FROM Customer c WHERE firstName like :firstName ORDER BY lastName, firstName";
    private final String sql4 = "SELECT c FROM Customer c";

    @Bean
    public Job batchjob() {
        return jobBuilderFactory.get("batchjob")
                .start(step1())
                .build();
    }

    @Bean
    public ItemReader<Customer> jpaCursorItemReader() {
        HashMap<String, Object> parameters = new HashMap<>();
        parameters.put("firstName", "A%");

        return new JpaCursorItemReaderBuilder<Customer>()
                .name("jpaCursorItemReader")
                .entityManagerFactory(entityManagerFactory) // EntityManager 설정
                .queryString(sql3) // 실행할 jpql문
                .parameterValues(parameters) // jpql문 내 인자
                .build();
    }

    @Bean
    public ItemWriter<Customer> itemWriter() {
        return new CustomItemWriter();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(jpaCursorItemReader())
                .writer(itemWriter())
                .allowStartIfComplete(true)
                .build();
    }
}

이제 Paging 기반의 ItemReader를 살펴볼텐데요, 종류는 위와 동일하게 JdbcPagingItemReader, JpaPagingItemReader 두 가지가 있습니다. 이 중 JdbcPagingItemReader를 먼저 살펴보겠습니다. pageSize를 설정해주고, 매핑할 클래스를 설정해준 다음 PagingQueryProvider를 설정해줍니다. 간단하게 코드만 보겠습니다.

@Configuration
@RequiredArgsConstructor
public class JobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Bean
    public Job batchjob() {
        return jobBuilderFactory.get("batchjob")
                .start(step1())
                .build();
    }

    @Bean
    public ItemReader<Customer> jdbcPagingItemReader() throws Exception {
        return new JdbcPagingItemReaderBuilder<Customer>()
                .name("jdcbPagingItemReader")
                .pageSize(10) // 사이즈 설정
                .dataSource(dataSource) // DB 설정
                .rowMapper(new BeanPropertyRowMapper<>(Customer.class)) // 매핑할 클래스 설정
                .queryProvider(pagingQueryProvider()) // PagingQueryProvider 설정
                .build();
    }

    @Bean
    public PagingQueryProvider pagingQueryProvider() throws Exception {
        Map<String, Order> sortKeys = new HashMap<>();
        sortKeys.put("id", Order.ASCENDING);

        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);
        queryProvider.setSelectClause("*");
        queryProvider.setFromClause("FREOM customer");
        queryProvider.setWhereClause("WHERE firstName like :firstName");
        queryProvider.setSortKeys(sortKeys); // 정렬 설정

        return queryProvider.getObject();
    }
    
    @Bean
    public ItemWriter<Customer> itemWriter() {
        return new CustomItemWriter();
    }

    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(jdbcPagingItemReader())
                .writer(itemWriter())
                .allowStartIfComplete(true)
                .build();
    }
}

이제 JpaPagingItemReader에 대해 살펴볼텐데요, 이 역시 위에서 본 JpaCursorItemReader와 비슷합니다. 단지 Paging 방식이기 때문에 가져올 데이터의 크기를 지정해주어야 합니다.

@Configuration
@RequiredArgsConstructor
public class JobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    private final String sql3 = "SELECT c FROM Customer c WHERE firstName like :firstName ORDER BY lastName, firstName";
    private final String sql4 = "SELECT c FROM Customer c";

    @Bean
    public Job batchjob() {
        return jobBuilderFactory.get("batchjob")
                .start(step1())
                .build();
    }

    @Bean
    public ItemReader<Customer> jpaPagingItemReader() {
        HashMap<String, Object> parameters = new HashMap<>();
        parameters.put("firstName", "A%");

        return new JpaPagingItemReaderBuilder<Customer>()
                .name("jpaCursorItemReader")
                .pageSize(10) // 가져올 데이터 갯수
                .entityManagerFactory(entityManagerFactory) // EntityManager 설정
                .queryString(sql3) // 실행할 jpql문
                .parameterValues(parameters) // jpql문 내 인자
                .build();
    }

    @Bean
    public ItemWriter<Customer> itemWriter() {
        return new CustomItemWriter();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(jpaPagingItemReader())
                .writer(itemWriter())
                .allowStartIfComplete(true)
                .build();
    }
}

이번 글에서는 DB에서 데이터를 읽어오는 ItemReader의 4가지 구현체들을 알아봤습니다. 다음 글에서는 ItemWriter의 구현체들에 대해 알아보겠습니다.

profile
거인의 어깨 위에서 탭댄스를

0개의 댓글

관련 채용 정보