[Spring Batch 6편] JpaPagingItemReader, JpaItemWriter로 DB 읽고 쓰기

송하연·2024년 11월 12일
0
post-thumbnail

이전글 - [Spring Batch 5편] JdbcPagingItemReader, JdbcBatchItemWriter로 읽고 쓰기

오늘은 JpaPagingItemReader로 파일을 일고 JpaItemWriter로 파일 쓰는 실습을 진행해보자

실습 1

실습 1. JpaPagingItemReader로 DB 읽기
JpaPagingItemReader를 사용하여 데이터베이스에서 데이터를 페이지 단위로 읽고, 읽어온 데이터를 csv파일로 저장하기

JpaPagingItemReader 알아보기

JpaPagingItemReader는 Spring Batch에서 JPA를 사용하여 데이터베이스로부터 데이터를 페이지 단위로 읽는 역할을 하는 ItemReader이다.

1) JPA 기능 활용
JPA 엔티티 매핑과 쿼리 기능을 활용하여 데이터를 객체로 자동 변환하며, 데이터베이스와 애플리케이션 간 데이터를 쉽게 관리할 수 있도록 한다.

2) 쿼리 최적화
JPA 쿼리를 사용해 필요한 데이터만 효율적으로 조회하여 불필요한 데이터 로드를 줄인다.

3) 커서 제어
JPA Criteria API를 통해 데이터를 순회하고 조건을 동적으로 설정할 수 있도록 한다.

JpaPagingItemReader 주요 구성 요소

EntityManagerFactory
JPA 엔티티 매니저를 생성하여 데이터베이스 연결을 설정한다.

JpaQueryProvider
데이터를 읽을 때 사용할 JPA 쿼리를 제공한다.

PageSize
한 번에 읽어올 데이터의 크기를 설정한다.

SkippableItemReader
오류 발생 시 해당 item을 건너뛸 수 있게 한다.

ReadListener
읽기 시작, 종료, 오류 발생 등의 이벤트를 처리할 수 있게 한다.

SaveStateCallback
작업 중단 시 상태를 저장하여 재시작 시 이어서 처리할 수 있도록 한다.

JpaPagingItemReader 구현하기

(1) build.gradle에 의존성 추가

jpa 기능을 사용하기위해 build.gradle에 jpa 관련 의존성을 추가해주었다.

//jpa
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'

(2) Customer 클래스 생성

Customer 클래스를 작성한 뒤 JPA를 통해 데이터베이스와 매핑될 수 있도록 Customer 클래스를 엔티티로 설정하였다.

package com.example.springbatch.jobs.entity;

import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Entity
@Table(name = "customer")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Customer {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private int id;

    private String name;
    private Integer age;
    private String gender;

(3) JpaPagingItemReader 작성하기

방법 1: 생성자를 사용한 JpaPagingItemReader 설정

@Bean
public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception {
    JpaPagingItemReader<Customer> jpaPagingItemReader = new JpaPagingItemReader<>();
    jpaPagingItemReader.setQueryString(
            "SELECT c FROM Customer c WHERE c.age > :age order by id desc"
    );
    jpaPagingItemReader.setEntityManagerFactory(entityManagerFactory);
    jpaPagingItemReader.setPageSize(CHUNK_SIZE);
    jpaPagingItemReader.setParameterValues(Collections.singletonMap("age", 20));
    return jpaPagingItemReader;
}
  • setQueryString: JPQL 쿼리를 사용하여 데이터를 가져올 조건과 정렬 방식을 설정한다.
    ex) age가 20 이상인 데이터를 id 내림차순으로 가져온다.

  • setEntityManagerFactory: JPA 엔터티 매니저 팩토리를 지정하여 데이터베이스와 연결한다.

  • setPageSize: 한 번에 읽어올 페이지 크기를 설정하며, 청크 사이즈와 맞춰주는 것이 일반적이다.

  • setParameterValues: JPQL 쿼리에 사용할 파라미터를 지정한다.


방법 2: JpaPagingItemReaderBuilder를 사용한 설정

@Bean
public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception {
    return new JpaPagingItemReaderBuilder<Customer>()
            .name("customerJpaPagingItemReader")
            .queryString("SELECT c FROM Customer c WHERE c.age > :age order by id desc")
            .pageSize(CHUNK_SIZE)
            .entityManagerFactory(entityManagerFactory)
            .parameterValues(Collections.singletonMap("age", 20))
            .build();
}

(4) ItemProcessor 작성하기

ItemProcessor는 데이터를 읽은 후 처리할 때 사용하는 단계로, 읽어온 데이터를 검증하거나 변환하는 데 주로 사용된다. 이 예제에서는 단순히 데이터를 로그로 출력하는 역할만 한다.

@Slf4j
public class CustomerItemProcessor implements ItemProcessor<Customer, Customer> {
    @Override
    public Customer process(Customer item) throws Exception {
        log.info("Item Processor ------------------- {}", item);
        return item;
    }
}
package com.example.springbatch.jobs.JpaPagingItemReader;

import com.example.springbatch.jobs.entity.Customer;
import jakarta.persistence.EntityManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.Collections;

@Slf4j
@Configuration
public class JpaPagingReaderJobConfig {

    public static final int CHUNK_SIZE = 2;
    public static final String ENCODING = "UTF-8";
    public static final String JPA_PAGING_CHUNK_JOB = "JPA_PAGING_CHUNK_JOB";

    @Autowired
    DataSource dataSource;

    @Autowired
    EntityManagerFactory entityManagerFactory;

    //방법 2: JpaPagingItemReaderBuilder를 사용한 설정
    @Bean
    public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception {

        return new JpaPagingItemReaderBuilder<Customer>()
                .name("customerJpaPagingItemReader")
                .queryString("SELECT c FROM Customer c WHERE c.age > :age order by id desc")
                .pageSize(CHUNK_SIZE)
                .entityManagerFactory(entityManagerFactory)
                .parameterValues(Collections.singletonMap("age", 20))
                .build();
    }

    //  방법 1: 생성자를 사용한 JpaPagingItemReader 설정
    /*
    @Bean
    public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception {
        JpaPagingItemReader<Customer> jpaPagingItemReader = new JpaPagingItemReader<>();
        jpaPagingItemReader.setQueryString(
            "SELECT c FROM Customer c WHERE c.age > :age order by id desc"
     );
        jpaPagingItemReader.setEntityManagerFactory(entityManagerFactory);
        jpaPagingItemReader.setPageSize(CHUNK_SIZE);
        jpaPagingItemReader.setParameterValues(Collections.singletonMap("age", 20));
         return jpaPagingItemReader;
}
     */

    @Bean
    public FlatFileItemWriter<Customer> customerJpaFlatFileItemWriter() {
        return new FlatFileItemWriterBuilder<Customer>()
                .name("customerJpaFlatFileItemWriter")
                .resource(new FileSystemResource("./output/customer_new_v2.csv"))
                .encoding(ENCODING)
                .delimited().delimiter("\t")
                .names("name", "age", "gender")
                .build();
    }


    @Bean
    public Step customerJpaPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
        log.info("------------------ Init customerJpaPagingStep -----------------");

        return new StepBuilder("customerJpaPagingStep", jobRepository)
                .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                .reader(customerJpaPagingItemReader())
                .processor(new CustomerItemProcessor())
                .writer(customerJpaFlatFileItemWriter())
                .build();
    }

    @Bean
    public Job customerJpaPagingJob(Step customerJpaPagingJob, JobRepository jobRepository) {
        log.info("------------------ Init customerJpaPagingJob -----------------");
        return new JobBuilder(JPA_PAGING_CHUNK_JOB, jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(customerJpaPagingJob)
                .build();
    }
}

JpaPagingItemReader 실행하기

(1) JpaPagingItemReader 실행 전 세팅

이전 실습에서 했던 것 처럼 application.yml에 실행할 JOB을 지정하고, JPA 관련 설정을 추가해주었다.

spring:
  batch:
    jdbc:
      initialize-schema: always
    job:
      name: JPA_PAGING_CHUNK_JOB
      enabled: true

  jpa:
    hibernate:
      ddl-auto: update
    show-sql: true
    properties:
      hibernate:
        format_sql: true

Customer 테이블이 없다면 테이블을 만들어주고 mock data를 추가하였다.

(2) 실행 결과..!

우리가 지정한 출력경로에 customer_new_v2.csv파일이 생성되었고, 앞서 작성했던 쿼리 조건에 맞는 데이터 (age가 20 이상인 데이터를 id 내림차순)를 알맞게 가져온 것을 확인할 수 있다.

(3) 메타데이터 스키마를 확인해보자

방금 실행한 JOB이 잘 돌아갔나 확인하기 위해 Spring Batch의 메타데이터 스키마 BATCH_STEP_EXECUTION 테이블을 확인해보자

✅CHUNK_SIZE를 2로 지정했기 때문에, JdbcPagingItemReader 데이터를 한 번에 2개씩 읽어올 수 있다.

✅ BATCH_STEP_EXECUTION 테이블에서 내가 실행한 JOB의 READ_COUNT가 7로 표시된 것은 총 7개의 데이터를 읽었음을 확인할 수 있다.

✅ COMMIT_COUNT가 4로 나타난 것은 7개의 데이터를 2개씩 3번, 나머지 1개를 마지막으로 한 번 더 커밋하여 총 4번에 걸쳐 데이터를 처리했음을 확인할 수 있다.

실습 2

실습 2. JpaItemWriter로 DB에 쓰기
JpaItemWriter를 이용하여 CSV 파일로부터 데이터를 읽어와 데이터베이스에 insert하기

JpaItemWriter 알아보기

JpaItemWriter는 Spring Batch에서 제공하는 ItemWriter 인터페이스를 구현하는 클래스로 데이터를 JPA를 통해 데이터베이스에 저장하는 데 사용된다.

장점

  • ORM 연동: JPA를 사용하여 다양한 데이터베이스와 유연하게 연동
  • 객체 매핑: 엔터티 객체를 직접 저장하여 코드가 간결
  • 유연성: 설정을 통해 다양한 방식으로 데이터를 저장할 수 있다.
  • 데이터베이스 종속성 줄임: 방언을 설정함으로써 JPA가 각 데이터베이스의 특성에 맞는 SQL을 생성

단점

  • 설정 복잡성: JPA 설정과 쿼리 작성이 복잡
  • 오류 가능성: 설정 오류 시 데이터 손상 위험성

JpaItemWriter 주요 구성 요소

  • EntityManagerFactory : JPA EntityManager 생성을 위한 팩토리 객체로, 데이터베이스와의 연결을 설정한다.
  • JpaQueryProvider : 저장할 엔터티에 대한 JPA 쿼리를 생성하는 역할을 담당하여 필요한 조건이나 설정을 적용할 수 있게 한다.

JpaItemWriter 구현하기

(1) JpaItemWriter 작성하기

  @Bean
    public JpaItemWriter<Customer> jpaItemWriter() {
        return new JpaItemWriterBuilder<Customer>()
                .entityManagerFactory(entityManagerFactory)
                .usePersist(true)
                .build();
    }

(2) 전체코드

package com.example.springbatch.jobs.JpaPagingItemReader;

import com.example.springbatch.jobs.entity.Customer;
import jakarta.persistence.EntityManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

@Slf4j
@Configuration
public class JpaItemJobConfig {

    public static final int CHUNK_SIZE = 100;
    public static final String ENCODING = "UTF-8";
    public static final String JPA_ITEM_WRITER_JOB = "JPA_ITEM_WRITER_JOB";

    @Autowired
    EntityManagerFactory entityManagerFactory;

    @Bean
    public FlatFileItemReader<Customer> flatFileItemReader3() {

        return new FlatFileItemReaderBuilder<Customer>()
                .name("FlatFileItemReader")
                .resource(new ClassPathResource("/static/customer.csv"))
                .encoding(ENCODING)
                .delimited().delimiter(",")
                .names("name", "age", "gender")
                .targetType(Customer.class)
                .build();
    }

    @Bean
    public JpaItemWriter<Customer> jpaItemWriter() {
        return new JpaItemWriterBuilder<Customer>()
                .entityManagerFactory(entityManagerFactory)
                .usePersist(true)
                .build();
    }


    @Bean
    public Step customerJpaStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        log.info("------------------ Init flatFileStep -----------------");

        return new StepBuilder("flatFileStep", jobRepository)
                .<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
                .reader(flatFileItemReader3())
                .writer(jpaItemWriter())
                .build();
    }

    @Bean
    public Job customerJpaJob(Step customerJpaStep, JobRepository jobRepository) {
        log.info("------------------ Init flatFileJob -----------------");
        return new JobBuilder(JPA_ITEM_WRITER_JOB, jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(customerJpaStep)
                .build();
    }
}

JpaItemWriter 실행하기

(1) yml 파일에 실행할 Job 지정

application.yml 파일에 실행할 Job의 이름을 JPA_ITEM_WRITER_JOB로 지정해주었다.

spring:
  batch:
    job:
      name: JDBC_BATCH_WRITER_CHUNK_JOB

(2) insert할 CSV 데이터 추가하기

resources/static 디렉토리에 customer.csv파일을 추가해주었다.

unclebae,40,Male
superman,45,Male
WonderWoman,30,Female

(3) 실행 결과..!

스프링부트를 실행하면 customer 테이블에 새로운 데이터가 추가되었다.

기존 데이터 위에 CSV 파일로부터 읽어온 데이터가 정상적으로 삽입되었음을 확인할 수 있다.

Jpa기반 VS JDBC 기반 차이점

JdbcPagingItemReader/JdbcBatchItemWriter

  • JDBC를 통해 데이터베이스에서 데이터를 읽고 쓰는 구성 요소들이다.
  • JdbcPagingItemReaderSQL 쿼리를 사용하여 데이터베이스에서 원하는 데이터를 필터링하고 페이징 처리를 수행하며, RowMapper를 사용하여 결과를 자바 객체로 수동 매핑한다.
  • JdbcBatchItemWriter는 SQL 쿼리를 직접 작성하여 데이터를 데이터베이스에 삽입하는 방식이다.
  • SQL 제어가 필요할 때 유리하며, SQL을 통해 각 데이터 처리를 세밀하게 관리할 수 있다.

JpaPagingItemReader/JpaItemWriter

  • JPA를 사용하여 엔티티를 기반으로 데이터베이스와 상호작용하는 구성 요소들이다.
  • JpaPagingItemReaderJPQL 쿼리를 통해 데이터베이스에서 엔티티를 페이징하여 읽어오며, JPA의 엔티티 매핑을 통해 데이터를 객체로 자동 변환한다.
  • JpaItemWriter는 JPA의 엔티티 매핑 기능을 통해 데이터를 데이터베이스에 자동으로 저장하는 방식으로, JPQL이나 엔티티 매니저를 사용하여 데이터베이스와 상호작용한다.
  • JPA를 사용하는 프로젝트에서 객체-관계 매핑이 필요할 때 적합하다.

참고) 디렉토리 구조

마무리

오늘은 실습을 통해 JpaPagingItemReader로 데이터베이스에서 데이터를 읽어와 파일로 저장하고, JpaItemWriter로 파일 데이터를 데이터베이스에 일괄 저장하는 방법에 대해 알아보았다.

참고글 - [SpringBatch 연재 06] JpaPagingItemReader로 DB내용을 읽고, JpaItemWriter로 DB에 쓰기

깃허브 - https://github.com/hysong4u/springbatch

profile
개발 기록 끄적끄적✏️ #백엔드개발자

0개의 댓글