이전글 - [Spring Batch 5편] JdbcPagingItemReader, JdbcBatchItemWriter로 읽고 쓰기
오늘은 JpaPagingItemReader로 파일을 일고 JpaItemWriter로 파일 쓰는 실습을 진행해보자
실습 1. JpaPagingItemReader로 DB 읽기
JpaPagingItemReader
를 사용하여 데이터베이스에서 데이터를 페이지 단위로 읽고, 읽어온 데이터를 csv파일로 저장하기
JpaPagingItemReader
는 Spring Batch에서 JPA를 사용하여 데이터베이스로부터 데이터를 페이지 단위로 읽는 역할을 하는 ItemReader이다.
1) JPA 기능 활용
JPA 엔티티 매핑과 쿼리 기능을 활용하여 데이터를 객체로 자동 변환하며, 데이터베이스와 애플리케이션 간 데이터를 쉽게 관리할 수 있도록 한다.
2) 쿼리 최적화
JPA 쿼리를 사용해 필요한 데이터만 효율적으로 조회하여 불필요한 데이터 로드를 줄인다.
3) 커서 제어
JPA Criteria API를 통해 데이터를 순회하고 조건을 동적으로 설정할 수 있도록 한다.
EntityManagerFactory
JPA 엔티티 매니저를 생성하여 데이터베이스 연결을 설정한다.
JpaQueryProvider
데이터를 읽을 때 사용할 JPA 쿼리를 제공한다.
PageSize
한 번에 읽어올 데이터의 크기를 설정한다.
SkippableItemReader
오류 발생 시 해당 item을 건너뛸 수 있게 한다.
ReadListener
읽기 시작, 종료, 오류 발생 등의 이벤트를 처리할 수 있게 한다.
SaveStateCallback
작업 중단 시 상태를 저장하여 재시작 시 이어서 처리할 수 있도록 한다.
jpa 기능을 사용하기위해 build.gradle
에 jpa 관련 의존성을 추가해주었다.
//jpa
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
Customer
클래스를 작성한 뒤 JPA를 통해 데이터베이스와 매핑될 수 있도록 Customer
클래스를 엔티티로 설정하였다.
package com.example.springbatch.jobs.entity;
import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Entity
@Table(name = "customer")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Customer {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id;
private String name;
private Integer age;
private String gender;
방법 1: 생성자를 사용한 JpaPagingItemReader 설정
@Bean
public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception {
JpaPagingItemReader<Customer> jpaPagingItemReader = new JpaPagingItemReader<>();
jpaPagingItemReader.setQueryString(
"SELECT c FROM Customer c WHERE c.age > :age order by id desc"
);
jpaPagingItemReader.setEntityManagerFactory(entityManagerFactory);
jpaPagingItemReader.setPageSize(CHUNK_SIZE);
jpaPagingItemReader.setParameterValues(Collections.singletonMap("age", 20));
return jpaPagingItemReader;
}
setQueryString
: JPQL 쿼리를 사용하여 데이터를 가져올 조건과 정렬 방식을 설정한다.
ex) age가 20 이상인 데이터를 id 내림차순으로 가져온다.
setEntityManagerFactory
: JPA 엔터티 매니저 팩토리를 지정하여 데이터베이스와 연결한다.
setPageSize
: 한 번에 읽어올 페이지 크기를 설정하며, 청크 사이즈와 맞춰주는 것이 일반적이다.
setParameterValues
: JPQL 쿼리에 사용할 파라미터를 지정한다.
방법 2: JpaPagingItemReaderBuilder를 사용한 설정
@Bean
public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception {
return new JpaPagingItemReaderBuilder<Customer>()
.name("customerJpaPagingItemReader")
.queryString("SELECT c FROM Customer c WHERE c.age > :age order by id desc")
.pageSize(CHUNK_SIZE)
.entityManagerFactory(entityManagerFactory)
.parameterValues(Collections.singletonMap("age", 20))
.build();
}
ItemProcessor는 데이터를 읽은 후 처리할 때 사용하는 단계로, 읽어온 데이터를 검증하거나 변환하는 데 주로 사용된다. 이 예제에서는 단순히 데이터를 로그로 출력하는 역할만 한다.
@Slf4j
public class CustomerItemProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer item) throws Exception {
log.info("Item Processor ------------------- {}", item);
return item;
}
}
package com.example.springbatch.jobs.JpaPagingItemReader;
import com.example.springbatch.jobs.entity.Customer;
import jakarta.persistence.EntityManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.Collections;
@Slf4j
@Configuration
public class JpaPagingReaderJobConfig {
public static final int CHUNK_SIZE = 2;
public static final String ENCODING = "UTF-8";
public static final String JPA_PAGING_CHUNK_JOB = "JPA_PAGING_CHUNK_JOB";
@Autowired
DataSource dataSource;
@Autowired
EntityManagerFactory entityManagerFactory;
//방법 2: JpaPagingItemReaderBuilder를 사용한 설정
@Bean
public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception {
return new JpaPagingItemReaderBuilder<Customer>()
.name("customerJpaPagingItemReader")
.queryString("SELECT c FROM Customer c WHERE c.age > :age order by id desc")
.pageSize(CHUNK_SIZE)
.entityManagerFactory(entityManagerFactory)
.parameterValues(Collections.singletonMap("age", 20))
.build();
}
// 방법 1: 생성자를 사용한 JpaPagingItemReader 설정
/*
@Bean
public JpaPagingItemReader<Customer> customerJpaPagingItemReader() throws Exception {
JpaPagingItemReader<Customer> jpaPagingItemReader = new JpaPagingItemReader<>();
jpaPagingItemReader.setQueryString(
"SELECT c FROM Customer c WHERE c.age > :age order by id desc"
);
jpaPagingItemReader.setEntityManagerFactory(entityManagerFactory);
jpaPagingItemReader.setPageSize(CHUNK_SIZE);
jpaPagingItemReader.setParameterValues(Collections.singletonMap("age", 20));
return jpaPagingItemReader;
}
*/
@Bean
public FlatFileItemWriter<Customer> customerJpaFlatFileItemWriter() {
return new FlatFileItemWriterBuilder<Customer>()
.name("customerJpaFlatFileItemWriter")
.resource(new FileSystemResource("./output/customer_new_v2.csv"))
.encoding(ENCODING)
.delimited().delimiter("\t")
.names("name", "age", "gender")
.build();
}
@Bean
public Step customerJpaPagingStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
log.info("------------------ Init customerJpaPagingStep -----------------");
return new StepBuilder("customerJpaPagingStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(customerJpaPagingItemReader())
.processor(new CustomerItemProcessor())
.writer(customerJpaFlatFileItemWriter())
.build();
}
@Bean
public Job customerJpaPagingJob(Step customerJpaPagingJob, JobRepository jobRepository) {
log.info("------------------ Init customerJpaPagingJob -----------------");
return new JobBuilder(JPA_PAGING_CHUNK_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(customerJpaPagingJob)
.build();
}
}
이전 실습에서 했던 것 처럼 application.yml에 실행할 JOB을 지정하고, JPA 관련 설정을 추가해주었다.
spring:
batch:
jdbc:
initialize-schema: always
job:
name: JPA_PAGING_CHUNK_JOB
enabled: true
jpa:
hibernate:
ddl-auto: update
show-sql: true
properties:
hibernate:
format_sql: true
Customer 테이블이 없다면 테이블을 만들어주고 mock data를 추가하였다.
우리가 지정한 출력경로에 customer_new_v2.csv
파일이 생성되었고, 앞서 작성했던 쿼리 조건에 맞는 데이터 (age가 20 이상인 데이터를 id 내림차순)를 알맞게 가져온 것을 확인할 수 있다.
방금 실행한 JOB이 잘 돌아갔나 확인하기 위해 Spring Batch의 메타데이터 스키마 BATCH_STEP_EXECUTION 테이블을 확인해보자
✅CHUNK_SIZE를 2로 지정했기 때문에, JdbcPagingItemReader 데이터를 한 번에 2개씩 읽어올 수 있다.
✅ BATCH_STEP_EXECUTION 테이블에서 내가 실행한 JOB의 READ_COUNT가 7로 표시된 것은 총 7개의 데이터를 읽었음을 확인할 수 있다.
✅ COMMIT_COUNT가 4로 나타난 것은 7개의 데이터를 2개씩 3번, 나머지 1개를 마지막으로 한 번 더 커밋하여 총 4번에 걸쳐 데이터를 처리했음을 확인할 수 있다.
실습 2. JpaItemWriter로 DB에 쓰기
JpaItemWriter
를 이용하여 CSV 파일로부터 데이터를 읽어와 데이터베이스에 insert하기
JpaItemWriter
는 Spring Batch에서 제공하는 ItemWriter 인터페이스를 구현하는 클래스로 데이터를 JPA를 통해 데이터베이스에 저장하는 데 사용된다.
장점
단점
EntityManagerFactory
: JPA EntityManager 생성을 위한 팩토리 객체로, 데이터베이스와의 연결을 설정한다.JpaQueryProvider
: 저장할 엔터티에 대한 JPA 쿼리를 생성하는 역할을 담당하여 필요한 조건이나 설정을 적용할 수 있게 한다. @Bean
public JpaItemWriter<Customer> jpaItemWriter() {
return new JpaItemWriterBuilder<Customer>()
.entityManagerFactory(entityManagerFactory)
.usePersist(true)
.build();
}
package com.example.springbatch.jobs.JpaPagingItemReader;
import com.example.springbatch.jobs.entity.Customer;
import jakarta.persistence.EntityManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;
@Slf4j
@Configuration
public class JpaItemJobConfig {
public static final int CHUNK_SIZE = 100;
public static final String ENCODING = "UTF-8";
public static final String JPA_ITEM_WRITER_JOB = "JPA_ITEM_WRITER_JOB";
@Autowired
EntityManagerFactory entityManagerFactory;
@Bean
public FlatFileItemReader<Customer> flatFileItemReader3() {
return new FlatFileItemReaderBuilder<Customer>()
.name("FlatFileItemReader")
.resource(new ClassPathResource("/static/customer.csv"))
.encoding(ENCODING)
.delimited().delimiter(",")
.names("name", "age", "gender")
.targetType(Customer.class)
.build();
}
@Bean
public JpaItemWriter<Customer> jpaItemWriter() {
return new JpaItemWriterBuilder<Customer>()
.entityManagerFactory(entityManagerFactory)
.usePersist(true)
.build();
}
@Bean
public Step customerJpaStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
log.info("------------------ Init flatFileStep -----------------");
return new StepBuilder("flatFileStep", jobRepository)
.<Customer, Customer>chunk(CHUNK_SIZE, transactionManager)
.reader(flatFileItemReader3())
.writer(jpaItemWriter())
.build();
}
@Bean
public Job customerJpaJob(Step customerJpaStep, JobRepository jobRepository) {
log.info("------------------ Init flatFileJob -----------------");
return new JobBuilder(JPA_ITEM_WRITER_JOB, jobRepository)
.incrementer(new RunIdIncrementer())
.start(customerJpaStep)
.build();
}
}
application.yml 파일에 실행할 Job의 이름을 JPA_ITEM_WRITER_JOB
로 지정해주었다.
spring:
batch:
job:
name: JDBC_BATCH_WRITER_CHUNK_JOB
resources/static
디렉토리에 customer.csv
파일을 추가해주었다.
unclebae,40,Male
superman,45,Male
WonderWoman,30,Female
스프링부트를 실행하면 customer 테이블에 새로운 데이터가 추가되었다.
기존 데이터 위에 CSV 파일로부터 읽어온 데이터가 정상적으로 삽입되었음을 확인할 수 있다.
JdbcPagingItemReader/JdbcBatchItemWriter
- JDBC를 통해 데이터베이스에서 데이터를 읽고 쓰는 구성 요소들이다.
JdbcPagingItemReader
는 SQL 쿼리를 사용하여 데이터베이스에서 원하는 데이터를 필터링하고 페이징 처리를 수행하며, RowMapper를 사용하여 결과를 자바 객체로 수동 매핑한다.JdbcBatchItemWriter
는 SQL 쿼리를 직접 작성하여 데이터를 데이터베이스에 삽입하는 방식이다.- SQL 제어가 필요할 때 유리하며, SQL을 통해 각 데이터 처리를 세밀하게 관리할 수 있다.
JpaPagingItemReader/JpaItemWriter
- JPA를 사용하여 엔티티를 기반으로 데이터베이스와 상호작용하는 구성 요소들이다.
JpaPagingItemReader
는 JPQL 쿼리를 통해 데이터베이스에서 엔티티를 페이징하여 읽어오며, JPA의 엔티티 매핑을 통해 데이터를 객체로 자동 변환한다.JpaItemWriter
는 JPA의 엔티티 매핑 기능을 통해 데이터를 데이터베이스에 자동으로 저장하는 방식으로, JPQL이나 엔티티 매니저를 사용하여 데이터베이스와 상호작용한다.- JPA를 사용하는 프로젝트에서 객체-관계 매핑이 필요할 때 적합하다.
오늘은 실습을 통해 JpaPagingItemReader
로 데이터베이스에서 데이터를 읽어와 파일로 저장하고, JpaItemWriter
로 파일 데이터를 데이터베이스에 일괄 저장하는 방법에 대해 알아보았다.
참고글 - [SpringBatch 연재 06] JpaPagingItemReader로 DB내용을 읽고, JpaItemWriter로 DB에 쓰기
깃허브 - https://github.com/hysong4u/springbatch