Spring-boot Batch ItemWriter

์žญ์žญ์ดยท2021๋…„ 4์›” 27์ผ
1

Spring-boot

๋ชฉ๋ก ๋ณด๊ธฐ
9/11
post-thumbnail

Spring-boot Batch ItemWriter

๐ŸŽ ๋ชฉ์ฐจ

0. ๊ฐœ์š”

ItemWriter๋Š” ์ถœ๋ ฅ ๊ธฐ๋Šฅ์ด๋‹ค.
์ดˆ์ฐฝ๊ธฐ Spring Batch๋Š” item์„ ํ•˜๋‚˜์”ฉ ๋‹ค๋ฃจ์—ˆ์ง€๋งŒ, ํ˜„์žฌ๋Š” chunk ๋‹จ์œ„๋กœ ๋ฌถ์ธ item List๋ฅผ ๋‹ค๋ฃฌ๋‹ค.
Reader์˜ read()๋Š” Item์„ ํ•˜๋‚˜ ๋ฐ˜ํ™˜ํ•˜๋Š” ๋ฐ˜๋ฉด, Writer์˜ write()๋Š” ์ธ์ž๋กœ Item List๋ฅผ ๋ฐ›๋Š”๋‹ค.
์ฆ‰, Reader์™€ Processor๋ฅผ ๊ฑฐ์ณ ์ฒ˜๋ฆฌ๋œ Item์„ Chunk๋‹จ์œ„ ๋งŒํผ ์Œ“์€ ๋’ค ์ด๋ฅผ Writer์—๊ฒŒ ์ „๋‹ฌํ•œ๋‹ค.

1. Database Writer

Java์—์„œ๋Š” JDBC๋˜๋Š” ORM์„ ์‚ฌ์šฉํ•˜์—ฌ RDBMS์— ์ ‘๊ทผํ•œ๋‹ค.
Spring Batch๋Š” JDBC์™€ ORM ๋ชจ๋‘ Writer๋ฅผ ์ œ๊ณตํ•œ๋‹ค.
Writer๋Š” Chunk๋‹จ์œ„์˜ ๋งˆ์ง€๋ง‰ ๋‹จ๊ณ„์ด๋ฉฐ ํ•ญ์ƒ ๋งˆ์ง€๋ง‰์— Flush๋ฅผ ํ•ด์ค˜์•ผ ํ•œ๋‹ค.
Writer๊ฐ€ ๋ฐ›์€ ๋ชจ๋“  Item์ด ์ฒ˜๋ฆฌ๋œ ํ›„, Spring Batch๋Š” ํ˜„์žฌ ํŠธ๋žœ์žญ์…˜์„ ์ปค๋ฐ‹ํ•œ๋‹ค.

Database Writer์˜ ์ข…๋ฅ˜๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค. ๋‹ค์Œ ์ค‘ boldํ‘œ์‹œ ๋œ ํ•ญ๋ชฉ์„ ์•Œ์•„๋ณด๊ฒ ๋‹ค.

  • JdbcBatchItemWriter
  • HibernateItemWriter
  • JpaItemWriter

2. JdbcBatchItemWriter

JdbcBatchItemWriter๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋™์ž‘ํ•œ๋‹ค.
1. ChunkSize๋งŒํผ Query ๋ชจ์œผ๊ธฐ
2. ๋ชจ์•„๋†“์€ Query ํ•œ๋ฒˆ์— ์ „์†ก
3. ๋ฐ›์€ ์ฟผ๋ฆฌ๋“ค Database์—์„œ ์‹คํ–‰

2.1. Create JdbcBatchItemWriter

  • JdbcBatchItemWriterJobConfiguration.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcBatchItemWriterJobConfiguartion {
  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final DataSource dataSource;

  private static final int chunkSize = 10;

  @Bean
  public Job jdbcBatchItemWriterJob() {
    return jobBuilderFactory.get("jdbcBatchItemWriterJob")
            .start(jdbcBatchItemWriterStep())
            .build();
  }

  @Bean
  public Step jdbcBatchItemWriterStep() {
    return stepBuilderFactory.get("jdbcBatchItemWriterStep")
            .<Pay, Pay>chunk(chunkSize)
            .reader(jdbcBatchItemWriterReader())
            .writer(jdbcBatchItemWriter())
            .build();
  }

  @Bean
  public JdbcCursorItemReader<Pay> jdbcBatchItemWriterReader() {
    return new JdbcCursorItemReaderBuilder<Pay>()
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Pay.class))
                .sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
                .name("jdbcBatchItemWriterReader")
                .build();
  }

  // beanMapped()๋ฅผ ์‚ฌ์šฉํ•  ๋–„๋Š” ํ•„์ˆ˜
  @Bean
  public JdbcBatchItemWriter<Pay> jdbcBatchItemWriter() {
    // ColumnMapped()
    // return new JdbcBatchItemWriterBuilder<Map<String, Object>>()
    //             .dataSource(dataSource)
    //             .sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)")
    //             .columnMapped()
    //             .build();

    // BeanMapped()
    return new JdbcBatchItemWriterBuilder<Pay>()
                .dataSource(dataSource)
                .sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)")
                .beanMapped()
                .build();
  }
}
  • JdbcBatchItemWriter
    • reader์—์„œ ๋„˜์–ด์˜จ ๋ฐ์ดํ„ฐ๋ฅผ ํ•˜๋‚˜์”ฉ ์ถœ๋ ฅํ•˜๋Š” wrtier
  • JdbcBatchItemWriterBuilder
    • columnMapped ์„ค์ • : <Key, Value> ๊ธฐ๋ฐ˜์˜ insert SQL Values ๋งคํ•‘
      • ex) JdbcBatchItemWriterBuilder<Map<String, Object>>()
    • beanMapped ์„ค์ • : Pojo ๊ธฐ๋ฐ˜์˜ insert SQL Values ๋งคํ•‘
      • ex) JdbcBatchItemWriterBuilder<Pay>()

2.2 Create pay2 table

ItemWriter๋ฅผ ํ™œ์šฉํ•˜์—ฌ pay2๋ผ๋Š” ํ…Œ์ด๋ธ”์— ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฝ์ž…ํ•  ๊ฒƒ์ด๋‹ค.
mysql์— ์ ‘์†ํ•˜์—ฌ pay2 ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ค์–ด์ค€๋‹ค.

create table pay2 (
  id         bigint not null auto_increment,
  amount     bigint,
  tx_name     varchar(255),
  tx_date_time datetime,
  primary key (id)
) engine = InnoDB;

2.3. Execute

./gradlew build -x test

java -jar ./build/libs/*.jar --job.name=jdbcBatchItemWriterJob

2.4. Check

pay2ํ…Œ์ด๋ธ”์„ ํ™•์ธํ•ด๋ณด๋ฉด ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด๊ฐ„ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

3. JpaItemWriter

ORM์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” JpaItemWriter์ด๋‹ค.
Writer์— ์ „๋‹ฌํ•˜๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ Entity ํด๋ž˜์Šค๋ผ๋ฉด JpaItemWriter๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

3.1. Create JpaItemWriter

JpaItemWriter๋Š” ๋„˜์–ด์˜จ Entity๋ฅผ Database์— ๋ฐ˜์˜ํ•œ๋‹ค.
์ฆ‰, JpaItemWriter๋Š” Entity ํด๋ž˜์Šค๋ฅผ ์ œ๋„ค๋ฆญ ํƒ€์ž…์œผ๋กœ ๋ฐ›์•„์•ผํ•œ๋‹ค.

  • JpaItemWriterJobConfiguration.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JpaItemWriterJobConfiguration {
  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final EntityManagerFactory entityManagerFactory;

  private static final int chunkSize = 10;

  @Bean
  public Job jpaItemWriterJob() {
    return jobBuilderFactory.get("jpaItemWriterJob")
            .start(jpaItemWriterStep())
            .build();
  }

  @Bean
  public Step jpaItemWriterStep() {
    return stepBuilderFactory.get("jpbItemWriterStep")
            .<Pay, Pay2>chunk(chunkSize)
            .reader(jpaItemWriterReader())
            .processor(jpaItemProcessor())
            .writer(jpaItemWriter())
            .build();
  }

  @Bean
  public JpaPagingItemReader<Pay> jpaItemWriterReader() {
    return new JpaPagingItemReaderBuilder<Pay>()
                .name("jpaItemWriterReader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("SELECT p FROM Pay p")
                .build();
  }

  @Bean
  public ItemProcessor<Pay, Pay2> jpaItemProcessor() {
    return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime());
  }

  @Bean
  public JpaItemWriter<Pay2> jpaItemWriter() {
    JpaItemWriter<Pay2> jpaItemWriter = new JpaItemWriter<>();
    jpaItemWriter.setEntityManagerFactory(entityManagerFactory);

    return jpaItemWriter;
  }
}
  • Pay2.java
@ToString
@Getter
@Setter
@NoArgsConstructor
@Entity
public class Pay2 {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss");

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private Long amount;
    private String txName;
    private LocalDateTime txDateTime;

    public Pay2(Long amount, String txName, String txDateTime) {
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
    }

    public Pay2(Long amount, String txName, LocalDateTime txDateTime) {
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = txDateTime;
    }

    public Pay2(Long id, Long amount, String txName, String txDateTime) {
        this.id = id;
        this.amount = amount;
        this.txName = txName;
        this.txDateTime = LocalDateTime.parse(txDateTime, FORMATTER);
    }
}
  • Pay2Repository.java
public interface Pay2Repository extends JpaRepository<Pay2, Long> { }

3.2. Execute

./gradlew build -x test

java -jar ./build/libs/*.jar --job.name=jpaItemWriterJob

3.3. Check

pay2ํ…Œ์ด๋ธ”์— ์ถ”๊ฐ€๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด๊ฐ„ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

4. Custom ItemWriter

Writer๋ฅผ Customํ•˜๊ฒŒ ๊ตฌํ˜„ํ•ด์•ผ ํ•  ์ผ์€ ๋นˆ๋ฒˆํ•˜๋‹ค.
์˜ˆ๋ฅผ ๋“ค์–ด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ฒฝ์šฐ๊ฐ€ ์žˆ๋‹ค.

  • Reader์—์„œ ์ฝ์–ด์˜จ ๋ฐ์ดํ„ฐ๋ฅผ RestTemplate์œผ๋กœ ์™ธ๋ถ€ API์— ์ „๋‹ฌํ•  ๊ฒฝ์šฐ
  • Singleton ๊ฐ์ฒด์— ๊ฐ’์„ ๋„ฃ์„ ๊ฒฝ์šฐ
  • ์—ฌ๋Ÿฌ Entity๋ฅผ saveํ•  ๊ฒฝ์šฐ

์ด๋Ÿฌํ•œ ๊ฒฝ์šฐ ItemWriter ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•˜๋ฉด ๋œ๋‹ค.
System.out.println์˜ ์—ญํ• ์„ ํ•˜๋Š” Writer๋ฅผ ๋งŒ๋“ค์–ด๋ณด์ž.

4.1. Create CustomItemWriter

  • CustomItemWriterJobConfiguration.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class CustomItemWriterJobConfiguration {
  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final EntityManagerFactory entityManagerFactory;

  private static final int chunkSize = 10;

  @Bean
  public Job customItemWriterJob() {
    return jobBuilderFactory.get("customItemWriterJob")
            .start(customItemWriterStep())
            .build();
  }

  @Bean
  public Step customItemWriterStep() {
    return stepBuilderFactory.get("customItemWriterStep")
            .<Pay, Pay2>chunk(chunkSize)
            .reader(customItemWriterReader())
            .processor(customItemWriterProcessor())
            .writer(customItemWriter())
            .build();
  }

  @Bean
  public JpaPagingItemReader<Pay> customItemWriterReader() {
    return new JpaPagingItemReaderBuilder<Pay>()
                .name("customeItemWriterReader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("SELECT p FROM Pay p")
                .build();
  }

  @Bean
  public ItemProcessor<Pay, Pay2> customItemWriterProcessor() {
    return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime());
  }

  @Bean
  public ItemWriter<Pay2> customItemWriter() {
    // Java8 ์ด์ƒ
    return items -> {
      for (Pay2 item : items) {
        System.out.println(item);
      }
    };

    // Java7 ์ดํ•˜
    // return new ItemWriter<Pay2>() {
    //   // ๋‹ค์Œ๊ณผ ๊ฐ™์ด write()๋ฅผ override๋กœ ๊ตฌํ˜„ํ•œ๋‹ค.
    //   @Override
    //   public void write(List<? extends Pay2> items) throws Exception {
    //     for (Pay2 item : items) {
    //       System.out.println(item);
    //     }
    //   }
    // };
  }
}

4.2. Execute

./gradlew build -x test

java -jar .\build\libs\demo-0.0.1-SNAPSHOT.jar --job.name=customItemWriterJob

4.3. Check

pay ํ…Œ์ด๋ธ”์˜ ๋ฐ์ดํ„ฐ๊ฐ€ pay2๋กœ ์ฒ˜๋ฆฌ๋˜์–ด ์ถœ๋ ฅ๋˜์—ˆ๋‹ค.


๋ชจ๋“  ์†Œ์Šค๋Š” ๊นƒํ—ˆ๋ธŒ์— ์˜ฌ๋ ค๋†“์•˜๋‹ค.
์ฐธ๊ณ ๋งํฌ: jojoldu ๋ธ”๋กœ๊ทธ

0๊ฐœ์˜ ๋Œ“๊ธ€