[Spring Batch 5편] JdbcPagingItemReader, JdbcBatchItemWriter로 DB 읽고 쓰기

송하연·2024년 11월 5일
0
post-thumbnail

이전글 - [Spring Batch 4편] FlatFileItemReader, FlatFileItemWriter로 파일 읽고 써보기

오늘은 JdbcPagingItemReader로 파일을 일고 JdbcBatchItemWriter로 파일 쓰는 실습을 진행해보자

실습 1

실습 1. JdbcPagingItemReader로 DB 읽기
JdbcPagingItemReader를 사용하여 데이터베이스에서 데이터를 페이지 단위로 읽고, 읽어온 데이터를 csv파일로 저장하기

1. JdbcPagingItemReader 알아보기

JdbcPagingItemReader란?

Spring Batch에서 제공하는 ItemReader로, 데이터베이스로부터 데이터를 페이지 단위로 읽어오는 데 사용된다.

  • 대규모 데이터 처리 시 메모리 사용량 최소화
  • 커밋 간격을 설정하여 효율적인 데이터 처리
  • SQL 쿼리를 직접 작성해 최적화된 데이터 읽기
  • 데이터베이스 커서를 활용해 순회 제어

JdbcPagingItemReader의 주요 구성 요소

DataSource: 데이터베이스 연결 정보 설정
SqlQuery: 데이터를 읽어올 SQL 쿼리를 설정
RowMapper: SQL 쿼리 결과를 특정 Item 객체로 변환
PageSize: 페이지 크기를 설정하여 한 번에 읽어올 데이터 양을 지정
SkippableItemReader: 오류 발생 시 해당 Item을 건너뜀
ReadListener: 읽기 시작, 종료, 오류 발생 등의 이벤트 처리
SaveStateCallback: 잡이 중단된 경우 현재 상태를 저장하여, 재시작 시 이어서 처리

2. JdbcPagingItemReader 구현하기

(1) Customer 클래스 생성하기

데이터베이스에서 읽어온 데이터를 저장할 모델 클래스 (Customer 클래스)를 작성해주었다.

@Getter 
@Setter
public class Customer {
    private String name;
    private int age;
    private String gender;

}

(2) QueryProvider 생성하기

JdbcPagingReaderJobConfig Class안에 데이터를 읽어오기 위한 SQL 쿼리를 정의하는 PagingQueryProvider를 작성해주었다.

@Autowired
DataSource dataSource;


@Bean
public PagingQueryProvider queryProvider() throws Exception {
    SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
    queryProvider.setDataSource(dataSource);  // 데이터 소스를 설정하여 DB에 맞는 PagingQueryProvider 선택
    queryProvider.setSelectClause("id, name, age, gender");  // 조회할 필드 지정
    queryProvider.setFromClause("from customer");  // 조회할 테이블 설정
    queryProvider.setWhereClause("where age >= :age");  // 조건절 설정

    Map<String, Order> sortKeys = new HashMap<>(1);
    sortKeys.put("id", Order.DESCENDING);  // 정렬 기준 설정

    queryProvider.setSortKeys(sortKeys);

    return queryProvider.getObject();
}

    }
  • setDataSource: 데이터 소스를 설정하여 데이터베이스와의 연결을 관리한다.
  • setSelectClause: SELECT 구문에서 조회할 필드(id, name, age, gender)를 지정한다.
  • setFromClause: 데이터를 가져올 테이블(customer)을 설정한다.
  • setWhereClause: 특정 조건(age >= 20)을 통해 데이터를 필터링한다.
  • setSortKeys: 정렬 기준(id 내림차순)을 지정하여 데이터의 정렬 방식을 설정한다.

(3) JdbcPagingItemReader 작성하기

JdbcPagingItemReader는 데이터베이스로부터 데이터를 페이지 단위로 읽어오는 역할을 한다. JdbcPagingReaderJobConfig Class 안에 JdbcPagingItemReader를 작성해주었다.

    @Bean
    public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception {

        Map<String, Object> parameterValue = new HashMap<>();
        parameterValue.put("age", 20);

        return new JdbcPagingItemReaderBuilder<Customer>()
                .name("jdbcPagingItemReader")
                .fetchSize(CHUNK_SIZE)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
                .queryProvider(queryProvider())
                .parameterValues(parameterValue)
                .build();
    }
  • name: ItemReader 이름 설정
  • fetchSize: 한 번에 읽어올 데이터의 개수를 설정하여 성능 최적화
  • dataSource: DataSource를 설정하여 데이터베이스와의 연결 관리
  • rowMapper: SQL 쿼리 결과를 Customer 객체로 변환하기 위해BeanPropertyRowMapper 사용
  • queryProvider: PagingQueryProvider를 설정하여 페이지 단위의 데이터를 가져오는 쿼리 제공
  • parameterValues: 쿼리에서 사용할 파라미터 값 설정

(4) JdbcPagingItemReader 전체코드

배치를 실행하기위한 기본적인 Job과 Step 설정이 추가된 전체코드이다.

package com.example.springbatch.jobs.JdbcPagingItemReader;

import com.example.springbatch.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
public class JdbcPagingReaderJobConfig {

    public static final int CHUNK_SIZE = 2;
    public static final String ENCODING = "UTF-8";
    public static final String JDBC_PAGING_CHUNK_JOB = "JDBC_PAGING_CHUNK_JOB";

    @Autowired
    DataSource dataSource;

    @Bean
    public JdbcPagingItemReader<Customer> jdbcPagingItemReader() throws Exception {

        Map<String, Object> parameterValue = new HashMap<>();
        parameterValue.put("age", 20);

        return new JdbcPagingItemReaderBuilder<Customer>()
                .name("jdbcPagingItemReader")
                .fetchSize(CHUNK_SIZE)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
                .queryProvider(queryProvider())
                .parameterValues(parameterValue)
                .build();
    }

    @Bean
    public PagingQueryProvider queryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);  // 데이터 소스를 설정하여 DB에 맞는 PagingQueryProvider 선택
        queryProvider.setSelectClause("id, name, age, gender");  // 조회할 필드 지정
        queryProvider.setFromClause("from customer");  // 조회할 테이블 설정
        queryProvider.setWhereClause("where age >= :age");  // 조건절 설정

        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.DESCENDING);  // 정렬 기준 설정

        queryProvider.setSortKeys(sortKeys);
        return queryProvider.getObject();
    }

    @Bean
    public FlatFileItemWriter<Customer> customerFlatFileItemWriter() {
        return new FlatFileItemWriterBuilder<Customer>()
                .name("customerFlatFileItemWriter")
                .resource(new FileSystemResource("./output/customer_new_v1.csv")) //출력 경로 지정
                .encoding(ENCODING)
                .delimited().delimiter("\t")
                .names("Name", "Age", "Gender")
                .build();
    }


    @Bean
    public Step customerJdbcPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
        log.info("------------------ Init customerJdbcPagingStep -----------------");

        return new StepBuilder("customerJdbcPagingStep", jobRepository)
                .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                .reader(jdbcPagingItemReader())
                .writer(customerFlatFileItemWriter())
                .build();
    }

    @Bean
    public Job customerJdbcPagingJob(Step customerJdbcPagingStep, JobRepository jobRepository) {
        log.info("------------------ Init customerJdbcPagingJob -----------------");
        return new JobBuilder(JDBC_PAGING_CHUNK_JOB, jobRepository) //잡이름
                .incrementer(new RunIdIncrementer())
                .start(customerJdbcPagingStep)
                .build();
    }
}

3. JdbcPagingItemReader 실행하기

이제 구현한 JdbcPagingItemReader를 실행하기위한 설정을 추가해보자.

(1) yml 파일에 실행할 Job 지정

하나의 프로젝트 안에 여러 Job을 구현한 경우, Spring Batch는 실행할 Job을 명확하게 지정해야 한다.

❗그렇지 않으면 Job name must be specified in case of multiple jobs 에러 메시지와 함께 에러가 발생하게 된다.❗

이를 해결하기 위해 application.yml 파일에 실행할 Job의 이름을 명시적으로 지정할 수 있다.

spring:
  batch:
    job:
      name: JDBC_PAGING_CHUNK_JOB
      enabled: true

아래와 같이 설정을 추가하여 위에서 작성한 JDBC_PAGING_CHUNK_JOB이라는 Job을 실행하도록 한다.

(2) customer 테이블 생성

customer 테이블을 데이터베이스에 생성하여, JdbcPagingItemReader가 읽을 수 있는 데이터 구조를 만든다.

CREATE TABLE customer (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(50),
    age INT,
    gender VARCHAR(10)
);

(3) 샘플 데이터 추가

테이블을 생성한 후, 아래와 QueryConsole에 아래 내용을 입력하여 샘플 데이터를 삽입하였다.

INSERT INTO customer (name, age, gender) VALUES ('person1', 22, 'Female');
INSERT INTO customer (name, age, gender) VALUES ('person2', 35, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person3', 28, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person4', 17, 'Female');
INSERT INTO customer (name, age, gender) VALUES ('person5', 35, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person6', 56, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person7', 22, 'Female');
INSERT INTO customer (name, age, gender) VALUES ('person8', 11, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('person9', 7, 'Male');
INSERT INTO customer (name, age, gender) VALUES ('hayeon', 24, 'Female');

(4) 스프링부트 실행 결과는..!

스프링부트 어플리케이션을 실행하면 우리가 설정한 출력 경로 ./output/customer_new_v1.csv에 출력 파일이 생성된 것을 확인할 수 있다.

customer_new_v1.csv 파일을 보면 Customer 테이블의 데이터가 저장되어있고 위 QueryProvider에서 설정한 조건(age >= 20, id 필드기준으로 내림차순 정렬)이 잘 적용되어 출력된 것을 확인할 수 있다.

(5) 스프링 배치 결과 분석 (메타데이터 스키마를 확인해보자)

방금 실행한 JOB이 잘 돌아갔나 확인하기 위해 Spring Batch의 메타데이터 스키마 BATCH_STEP_EXECUTION 테이블을 확인해보자

CHUNK_SIZE를 2로 지정했기 때문에, JdbcPagingItemReader는 데이터를 한 번에 2개씩 읽어올 수 있다.

BATCH_STEP_EXECUTION 테이블에서 내가 실행한 JOB의 READ_COUNT가 7로 표시된 것은 총 7개의 데이터를 읽었음을 확인할 수 있다.

COMMIT_COUNT가 4로 나타난 것은 7개의 데이터를 2개씩 3번, 나머지 1개를 마지막으로 한 번 더 커밋하여 총 4번에 걸쳐 데이터를 처리했음을 확인할 수 있다.

실습 2

실습 2. JdbcBatchItemWriter로 DB에 쓰기
JdbcBatchItemWriter를 이용하여 CSV 파일로부터 데이터를 읽어와 데이터베이스에 insert하기

1. JdbcBatchItemWriter 알아보기

JdbcBatchItemWriter란?

JdbcBatchItemWriter는 Spring Batch에서 데이터베이스에 데이터를 저장하는 ItemWriter 구현체로 대량의 데이터를 JDBC를 통해 효율적으로 저장할 때 주로 사용된다.

JdbcBatchItemWriter 주요 구성 요소

  • DataSource: 데이터베이스 연결을 위한 정보 제공
  • SqlStatementCreator: 데이터 저장 시 필요한 INSERT 쿼리를 생성
  • PreparedStatementSetter: INSERT 쿼리에 필요한 파라미터 값을 설정
  • ItemSqlParameterSourceProvider: Customer와 같은 객체의 필드를 쿼리에 전달할 파라미터로 변환

JdbcBatchItemWriter의 장단점

장점

  • 다양한 데이터베이스 연동 가능
  • 대량 데이터를 빠르게 처리할 수 있어 성능에 유리
  • 설정을 통해 데이터 저장 방식에 대한 유연한 제어 가능

단점

  • 설정이 복잡할 수 있음
  • 데이터베이스 설정에 대한 종속성이 생길 수 있음.
  • 설정 오류 발생 시 데이터 손상의 가능성 있음.

2. JdbcBatchItemWriter 구현하기

(1) JdbcBatchItemWriter 작성하기

JdbcBatchItemWriter는 데이터를 데이터베이스 테이블에 일괄적으로 저장하는 데 사용한다.
아래 코드 에서는 customer 테이블에 Customer 객체의 데이터를 저장하도록 한다.

JdbcBatchItemJobConfig Class 안에 JdbcBatchItemWriter를 작성해주었다.

@Bean
    public JdbcBatchItemWriter<Customer> jdbcBatchItemWriter() {
        return new JdbcBatchItemWriterBuilder<Customer>()
                .dataSource(dataSource)
                .sql("INSERT INTO customer (nam![](https://velog.velcdn.com/images/hanni/post/3536ed28-3463-4df0-8a99-2ede702dc2dc/image.png)
e, age, gender) VALUES (:name, :age, :gender)")
                .itemSqlParameterSourceProvider(new CustomerItemSqlParameterSourceProvider())
                .build();
    }
  • dataSource: 데이터베이스에 연결하기 위한 정보 지정
  • sql: 데이터를 삽입할 SQL 구문을 작성. :name, :age, :gender는 Customer 객체의 필드 값으로 매핑됨
  • itemSqlParameterSourceProvider: Customer 객체의 필드를 SQL 파라미터로 변환하는 역할을 하는 클래스(CustomerItemSqlParameterSourceProvider) 설정

(2) CustomerItemSqlParameterSourceProvider 작성하기

CustomerItemSqlParameterSourceProvider는 Customer 객체의 필드를 SQL 쿼리의 파라미터로 변환해주는 역할을 한다.

CustomerItemSqlParameterSourceProvider Class 클래스를 새로 생성해 작성해주었다.

public class CustomerItemSqlParameterSourceProvider implements ItemSqlParameterSourceProvider<Customer> {
    @Override
    public SqlParameterSource createSqlParameterSource(Customer item) {
        return new BeanPropertySqlParameterSource(item);
    }
}

(3) JdbcBatchItemJobConfig 전체코드

배치를 실행하기위한 기본적인 Job과 Step 설정이 추가된 전체코드이다.

package com.example.springbatch.jobs.JdbcPagingItemReader;


import com.example.springbatch.jobs.models.Customer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Slf4j
@Configuration
public class JdbcBatchItemJobConfig {

    public static final int CHUNK_SIZE = 100;
    public static final String ENCODING = "UTF-8";
    public static final String JDBC_BATCH_WRITER_CHUNK_JOB = "JDBC_BATCH_WRITER_CHUNK_JOB";

    @Autowired
    DataSource dataSource;

    @Bean
    public FlatFileItemReader<Customer> flatFileItemReader2() {

        return new FlatFileItemReaderBuilder<Customer>()
                .name("FlatFileItemReader")
                .resource(new ClassPathResource("static/customer.csv")) //입력 경로
                .encoding(ENCODING)
                .delimited().delimiter(",")
                .names("name", "age", "gender")
                .targetType(Customer.class)
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<Customer> jdbcBatchItemWriter() {
        return new JdbcBatchItemWriterBuilder<Customer>()
                .dataSource(dataSource)
                .sql("INSERT INTO customer (name, age, gender) VALUES (:name, :age, :gender)")
                .itemSqlParameterSourceProvider(new CustomerItemSqlParameterSourceProvider())
                .build();
    }


    @Bean
    public Step customerJdbcBatchingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        log.info("------------------ Init customerJdbcPagingStep -----------------");

        return new StepBuilder("customerJdbcBatchingStep", jobRepository)
                .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                .reader(flatFileItemReader2())
                .writer(jdbcBatchItemWriter())
                .build();
    }

    @Bean
    public Job customerJdbcBatchingJob(Step customerJdbcBatchingStep, JobRepository jobRepository) {
        log.info("------------------ Init customerJdbcBatchingStep -----------------");
        return new JobBuilder(JDBC_BATCH_WRITER_CHUNK_JOB, jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(customerJdbcBatchingStep)
                .build();
    }



}

3. JdbcBatchItemWriter 실행하기

(1) yml 파일에 실행할 Job 지정

application.yml 파일에 실행할 Job의 이름을 JDBC_BATCH_WRITER_CHUNK_JOB로 지정해주었다.

spring:
  batch:
    job:
      name: JDBC_BATCH_WRITER_CHUNK_JOB

(2) insert할 CSV 데이터 추가하기

resources/static 디렉토리에 customer.csv파일을 추가해주었다.

unclebae,40,Male
superman,45,Male
WonderWoman,30,Female

(3) 스프링부트 실행 결과는..!

스프링부트를 실행하면 customer 테이블에 새로운 데이터가 추가되었다.

기존 데이터 위에 CSV 파일로부터 읽어온 데이터가 정상적으로 삽입되었음을 확인할 수 있다.

참고) 디렉토리 구조

마무리

오늘은 실습을 통해 JdbcPagingItemReader로 데이터베이스에서 데이터를 읽어와 파일로 저장하고, JdbcBatchItemWriter로 파일 데이터를 데이터베이스에 일괄 저장하는 방법에 대해 알아보았다.

참고글 - [SpringBatch 연재 05] JdbcPagingItemReader로 DB내용을 읽고, JdbcBatchItemWriter로 DB에 쓰기
깃허브 - https://github.com/hysong4u/springbatch

profile
개발 기록 끄적끄적✏️ #백엔드개발자

1개의 댓글

comment-user-thumbnail
2024년 11월 7일

고생많았어요 ㅠㅠ 하연님이 발표했으면 더 깔끔했을것 같아요 ㅠㅠ

답글 달기