[Spring-batch] getting-started

오늘내일·2024년 6월 4일
0
post-custom-banner

spring-batch를 이해하기에 저번 2분 튜토리얼을 부족한 것 같아서 공식문서의 getting-started guide를 하나씩 따라하며 정리해본다. 나와 같이 spring-batch에 대해 감을 잡기 위해 도움이 될까 해서 정리해본다. 정리하면서 궁금한 부분은 GPT에 물어보거나 검색하였다.

0. 실습 목표

  • csv 파일을 읽어서 각 데이터를 대문자로 변환하여 db에 저장한다.

1. 의존성 추가

  • 실습에 필요한 spring-batch-core와 내장 DB를 사용하기 위해 hsql 의존성을 추가한다. 나는 gradle을 사용하여 아래와 같이 의존성을 추가했다.
 		implementation 'org.springframework.batch:spring-batch-core'
    runtimeOnly 'org.hsqldb:hsqldb'

2. 샘플 데이터 준비

  • 샘플 데이터를 담고 있는 csv 파일을 아래 경로와 같이 준비한다.
  • 경로 : src/main/resources/sample-data.csv
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
  • 샘플 데이터를 변환하여 DB에 저장할 테이블을 만들기 위한 SQL script를 아래 경로와 같이 준비한다.
  • 경로 : src/main/resources/schema-all.sql
DROP TABLE people IF EXISTS;

CREATE TABLE people  (
                         person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
                         first_name VARCHAR(20),
                         last_name VARCHAR(20)
);

💡 hsql 의존성이 있으면 애플리케이션이 실행되면 Spring Boot가 자동으로 schema-all.sql 파일을 읽고 실행하여 테이블을 생성합니다.

3. 비즈니스 객체 생성

  • csv 파일에서 읽어온 데이터를 담을 수 있는 객체를 만든다.
  • 나는 이 객체를 결국 DB에 저장할 것이므로 Entity와 유사하다고 생각하여, entity 패키지 아래 만들었다.
package com.study.gettingstartedguide.entity;

public record Person(String firstName, String lastName) {

}

4. 데이터 변환 객체 생성

  • csv 파일에서 읽어와 대문자로 변환하는 작업을 할 객체와 메서드를 만든다.
  • batch 작업 과정에서 이러한 데이터 변환 작업은 ItemProcessor 인터페이스에 해당하므로 ItemProcessor 인터페이스를 구현한다.
  • ItemProcessor 인터페이스를 살펴보면 아래와 같은데 제네릭의 I는 process 메서드의 파라미터로 입력받을 객체의 타입, O는 process 메서드가 반환할 객체의 타입을 의미한다.
@FunctionalInterface
public interface ItemProcessor<I, O> {

	@Nullable
	O process(@NonNull I item) throws Exception;

}
  • 구현한 PersonItemProcessor 객체는 아래와 같다. 대문자 변환 작업을 담고 있다.
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
  private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

  @Override
  public Person process(final Person person) {
    final String firstName = person.firstName().toUpperCase();
    final String lastName = person.lastName().toUpperCase();

    final Person transformedPerson = new Person(firstName, lastName);

    log.info("Converting (" + person + ") into (" + transformedPerson + ")");

    return transformedPerson;
  }
}

5. ItemReader, ItemProcessor, ItemWriter Bean 등록

  • spring-batch 2분 튜토리얼에서 batch 작업은 크게 Job을 하나의 단위로 실행하고, Job 안에 하나 또는 여러 Step이 존재하고, Step 안에 하나 또는 여러 Chunk가 존재한다고 했다. 그리고 Chunk는 데이터를 읽고(ItemReader), 처리하고(ItemProcessor), 기록하는(ItemWriter) 단위이다. 따라서 chunk를 구성하기 위해 reader, processor, writer를 아래와 같이 빈 등록해야 한다.
@Configuration
public class BatchConfiguration {

  @Bean
  public FlatFileItemReader<Person> reader() {
    return new FlatFileItemReaderBuilder<Person>()
        .name("personItemReader")  
        .resource(new ClassPathResource("sample-data.csv"))
        .delimited()
        .names("firstName", "lastName")
        .targetType(Person.class)
        .build();
  }

  @Bean
  public PersonItemProcessor processor() {
    return new PersonItemProcessor();
  }

  @Bean
  public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Person>()
        .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
        .dataSource(dataSource)
        .beanMapped()
        .build();
  }

}

각각 자세히 살펴보자

  • reader() 메소드
    • spring-batch에서 기본적으로 csv 파일을 읽을 수 있는 FlatFileItemReader를 제공한다. 제네릭의 타입은 csv 파일을 파싱하여 만들 객체 타입이다.
    • FlatFileItemReaderBuilder의 메서드들을 살펴보자.
      • name() : FlatFileItemReader 인스턴스에 이름을 지정(객체와 인스턴스 차이는 아래 별도 정리)
      • resource() : reader가 읽을 파일 위치 지정
      • delimited() : csv 파일을 읽을 때 사용할 구분자 설정(기본 구분자는 쉼표)
      • names() : csv 파일 각 column에 해당하는 필드 이름을 지정(1열 : firstName, 2열 : lastName)
      • targetType() : csv 파일을 파싱하여 만들 객체 타입을 지정
  • processor() 메소드
    • 앞서 구성한 PersonItemProcessor 객체를 반환한다.
  • writer() 메소드
    • spring-batch는 대량의 데이터를 DB에 효율적으로 저장하기 위해 JdbcBatchItemWriter를 제공한다. 따라서 DB connection을 관리하는 dataSource를 파라미터로 입력 받는다.
    • JdbcBatchItemWriterBuilder의 메서드들을 살펴보자.
      • sql() : 삽입할 sql 쿼리를 설정한다.
      • dataSource() : DB 연결정보 설정
      • beanMapped() : sql 쿼리에서 작성한 ‘:firstName’과 ‘:lastName’을 Person 객체의 프로퍼티로 매핑하는 기능을 제공한다.

💡 객체와 인스턴스의 비교

  • 객체 : 데이터와 기능을 묶어서 캡슐화한 것을 강조
  • 인스턴스 : 클래스의 특정 실체로, 메모리 상에 존재하는 구체적인 값을 강조

6. JobExecutionListener 구현

  • Job을 구성하기에 앞서 Job이 완료 후 실행할 기능을 구성할 수 있는 JobCompletionNotificationListener를 먼저 만들어보자. 아래 코드를 보면 알 수 있지만 JobCompletionNotificationListener는 JobExecutionListener를 구현한 객체이다. 아래 코드는 afterJob 메서드를 구현하였지만, beforeJob 메서드를 구현하여 Job이 실행되기 전에 작동할 기능을 구현할 수도 있다.
@Component
public class JobCompletionNotificationListener implements JobExecutionListener {

  private static final Logger log = LoggerFactory.getLogger(
      JobCompletionNotificationListener.class);

  private final JdbcTemplate jdbcTemplate;

  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      jdbcTemplate.query("SELECT first_name, last_name FROM people",
              new DataClassRowMapper<>(Person.class))
          .forEach(person -> log.info("Found <{{}}> in the database.", person));
    }

  }
}
  • 위 코드를 간단히 살펴보면 Job이 완료되면 DB의 people 테이블에서 데이터를 조회하여 출력하는 기능이므로, sql 쿼리를 실행하기 위해 JdbcTemplate 객체를 생성자로 주입받는다. 참고로 DataClassRowMapper 객체는 DB 쿼리 결과로부터 얻은 행(Row) 데이터를 Java 객체로 변환하는데 사용되는 객체이다.

7. Job, Step 설정

  • 위에 작성한 BatchConfiguration 에 이제 Job, Step을 아래와 같이 구성해보자.
 	@Bean
  public Step step1(JobRepository jobRepository,
      DataSourceTransactionManager transactionManager, FlatFileItemReader<Person> reader,
      PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
    return new StepBuilder("step1", jobRepository)
        .<Person, Person> chunk(3, transactionManager)
        .reader(reader)
        .processor(processor)
        .writer(writer)
        .build();
  }

  @Bean
  public Job importUserJob(JobRepository jobRepository, Step step1,
      JobCompletionNotificationListener listener) {
    return new JobBuilder("importUserJob", jobRepository)
        .listener(listener)
        .start(step1)
        .build();
  }

코드를 살펴보자

  • step1 메서드
    • 실행할 step을 구성한다. 2분 튜토리얼에서는 tasklet을 이용하여 하나의 단일 작업을 구성 했다면, 이번에는 앞서 구현한 reader, processor, writer와 jobRepository, transactionManager를 파라미터로 입력 받아서 구성한다.
    • StepBuilder는 step 인스터스 이름과 meta-data를 저장할 jobRepository를 파라미터로 입력한다.
      • <Person, Person> chunk() : 청크 기반 처리를 설정한다.
        • 제네릭은 앞서 구성한 ItemProcessor의 입력값과 반환값의 타입이다.
        • 각 청크마다 트랜잭션이 관리되기 때문에 transactionManager도 파라미터로 입력한다.
        • 청크 사이즈를 의미하는데 3개의 아이템이 처리될 때마다 트랜잭션이 커밋된다.
          • 예를 들어 A, B, C, D, E 아이템이 있다면 청크 사이즈가 3인 경우 A, B, C 아이템을 read, process, write 과정을 거친 후 트랜잭션이 커밋된 후, D, E 아이템을 read, process, write과정을 거친 후 또 트랜잭션이 커밋된다.(하나의 Item을 하나의 Row라고 생각해도 무방할 듯 하다.)
      • reader() : Step에서 사용할 ItemReader를 설정한다.
      • processor() : Step에서 사용할 ItemProcessor를 설정한다.
      • writer() : Step에서 사용할 ItemWriter를 설정한다.
  • importUserJob 메서드
    • batch 관련 meta-data를 저장할 jobRepository, 실행시킬 Step, 앞서 구성한 JobCompletionNotificationListener를 파라미터로 입력받아 Job을 구성한다.

💡 @EnableBatchProcessing 어노테이션이 없는데도 jobRepository가 만들어지는게 신기하다. 어떤 과정에서 만들어지는 것인가 찾아봤는데 spring-batch-core 의존성이 있으면, @SpringBootApplication 어노테이션에 의해서 batch 관련 component들이 자동 생성된다고 한다. 스프링 부트를 사용하지 않고 스프링을 사용하면 생성되지 않을 듯 하다.

profile
다시 시작합니다.
post-custom-banner

0개의 댓글