Spring-boot Batch ItemProcessor

์žญ์žญ์ดยท2021๋…„ 4์›” 28์ผ
1

Spring-boot

๋ชฉ๋ก ๋ณด๊ธฐ
10/11
post-thumbnail

Spring-boot Batch ItemProcessor

๐ŸŽ Contents

0. Summary

ItemProcessor๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€๊ณตํ•˜๊ฑฐ๋‚˜ ํ•„ํ„ฐ๋งํ•˜๋Š” ์—ญํ• ์ด๋‹ค.
ItemWriter์—์„œ ์ถฉ๋ถ„ํžˆ ๊ตฌํ˜„ ๊ฐ€๋Šฅํ•˜๊ณ  ๋”ฐ๋ผ์„œ Processor๋Š” ํ•„์ˆ˜๊ฐ€ ์•„๋‹ˆ๋‹ค.
ํ•˜์ง€๋งŒ ์ฝ๊ธฐ/์ฒ˜๋ฆฌ/์“ฐ๊ธฐ๋ฅผ ๋ถ„๋ฆฌํ•  ์ˆ˜ ์žˆ์–ด ์‚ฌ์šฉ์„ ๊ถŒ์žฅํ•œ๋‹ค.

1. ItemProcessor

ItemReader์—์„œ ๋„˜๊ฒจ์ค€ ๋ฐ์ดํ„ฐ ๊ฐœ๋ณ„๊ฑด์„ ๊ฐ€๊ณต/์ฒ˜๋ฆฌํ•œ๋‹ค.
๋ฐ์ดํ„ฐ๋ฅผ ์›ํ•˜๋Š” ํƒ€์ž…์œผ๋กœ ๋ณ€ํ™˜ํ•˜๊ฑฐ๋‚˜ ์›ํ•˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ํ•„ํ„ฐ๋งํ•˜์—ฌ ์ „๋‹ฌํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

ItemProcessor๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๊ตฌ์„ฑ๋˜์–ด ์žˆ๋‹ค.

public interface ItemProcessor<I, O> {
  O process(I item) throws Exception;
}
  • I
    • ItemReader์—์„œ ๋ฐ›์„ ๋ฐ์ดํ„ฐ ํƒ€์ž…
  • O
    • ItemWriter์—๊ฒŒ ๋ณด๋‚ผ ๋ฐ์ดํ„ฐ ํƒ€์ž…

ItemReader๊ฐ€ ์ฝ์€ ๋ฐ์ดํ„ฐ๋ฅผ ItemProcessor์˜ process()๋ฅผ ํ†ต๊ณผํ•˜์—ฌ ItemWriter์—๊ฒŒ ์ „๋‹ฌ๋œ๋‹ค.
์ฝ”๋“œ์˜ ์–‘์ด ๋งŽ์•„์ง€๋ฉด ๋ณ„๋„์˜ ํด๋ž˜์Šค๋กœ ๋ถ„๋ฆฌํ•ด์„œ ์“ฐ๊ธฐ๋„ ํ•˜์ง€๋งŒ ๋ณดํ†ต์€ ์ต๋ช…ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

2. ItemProcessor Transformation

ItemReader๋กœ ์ฝ์–ด์˜จ ๋ฐ์ดํ„ฐ์˜ ํƒ€์ž…์„ ๋ณ€ํ™˜์‹œ์ผœ ItemWriter์— ์ „๋‹ฌํ•˜๋Š” ์˜ˆ์ œ์ด๋‹ค.

2.1. Create convert processor

  • ProcessorConvertJobConfiguration.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorConvertJobConfiguration {
  public static final String JOB_NAME = "processorConvertBatch";
  public static final String BEAN_PREFIX = JOB_NAME + "_";

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final EntityManagerFactory entityManagerFactory;  

  @Value("${chunkSize:1000}")
  private int chunkSize;

  @Bean(JOB_NAME)
  public Job job() {
    return jobBuilderFactory.get(JOB_NAME)
            .preventRestart()
            .start(step())
            .build();
  }

  @Bean(BEAN_PREFIX + "step")
  @JobScope
  public Step step() {
    return stepBuilderFactory.get(BEAN_PREFIX + "step")
            .<Pay, String>chunk(chunkSize)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
  }

  @Bean
  public JpaPagingItemReader<Pay> reader() {
    return new JpaPagingItemReaderBuilder<Pay>()
                .name(BEAN_PREFIX+"reader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("SELECT t FROM Pay t")
                .build();
  }

  @Bean
  public ItemProcessor<Pay, String> processor() {
    return pay -> {
      return pay.getTxName();
    };
  }

  private ItemWriter<String> writer() {
    return items -> {
      for (String item: items) {
        log.info("Pay TxName={}", item);
      }
    };
  }
}

2.2. Execute

./gradlew build -x test

java -jar ./build/libs/*.jar --job.name=processorConvertBatch

2.3. Check

Payํด๋ž˜์Šค๊ฐ€ ItemProcessor๋ฅผ ํ†ตํ•ด String์œผ๋กœ ์ž˜ ๋ณ€ํ™˜๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

3. ItemProcessor Filter

ItemReader๋กœ ์ฝ์–ด์˜จ ๋ฐ์ดํ„ฐ ์ค‘ ํ•„ํ„ฐ๋ง ๋œ ๋ฐ์ดํ„ฐ๋งŒ ItemWriter์—๊ฒŒ ์ „๋‹ฌํ•˜๋Š” ์˜ˆ์ œ์ด๋‹ค.

3.1. Create filter processor

  • ProcessorConvertJobConfiguration.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorNullJobConfiguration {
  
  public static final String JOB_NAME = "processorNullBatch";
  public static final String BEAN_PREFIX = JOB_NAME + "_";

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;
  private final EntityManagerFactory emf;

  @Value("${chunkSize:1000}")
  private int chunkSize;

  @Bean(JOB_NAME)
  public Job job() {
    return jobBuilderFactory.get(JOB_NAME)
            .preventRestart()
            .start(step())
            .build();
  }

  @Bean(BEAN_PREFIX + "step")
  @JobScope
  public Step step() {
    return stepBuilderFactory.get(BEAN_PREFIX + "step")
            .<Pay, Pay>chunk(chunkSize)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
  }

  @Bean(BEAN_PREFIX + "reader")
  public JpaPagingItemReader<Pay> reader() {
    return new JpaPagingItemReaderBuilder<Pay>()
            .name(BEAN_PREFIX+"reader")
            .entityManagerFactory(emf)
            .pageSize(chunkSize)
            .queryString("SELECT t FROM Pay t")
            .build();
  }

  @Bean(BEAN_PREFIX + "processor")
  public ItemProcessor<Pay, Pay> processor() {
    return pay -> {

      boolean isIgnoreTarget = pay.getAmount() > 2000L;
      if(isIgnoreTarget){
        log.info(">>>>>>>>> Pay txName={}, isIgnoreTarget={}", pay.getTxName(), isIgnoreTarget);
        return null;
      }

      return pay;
    };
  }

  private ItemWriter<Pay> writer() {
    return items -> {
      for (Pay item : items) {
        log.info("pay txName={}", item.getTxName());
      }
    };
  }
}

3.2. Execute

./gradlew build -x test

java -jar ./build/libs/*.jar --job.name=processorNullBatch

3.3. Check

Pay ํ…Œ์ด๋ธ”์˜ row ์ค‘ amount๊ฐ€ 3000์ด์ƒ์ธ ๊ฒƒ์„ ์ œ์™ธํ•œ ๋‚˜๋จธ์ง€๊ฐ€ ํ•„ํ„ฐ๋ง๋˜์–ด์กŒ๋‹ค.

4. ItemProcessor chaining

ItemProcessor๊ฐ€ ์—ฌ๋Ÿฌ ๊ธฐ๋Šฅ์„ ํฌํ•จํ•  ๊ฒฝ์šฐ๋ฅผ ์ƒ๊ฐํ•ด๋ณด์ž.
ํ•˜๋‚˜์˜ ItemProcessor ์—ญํ• ์ด ์ ์  ์ปค์ง€๊ฒŒ ๋  ๊ฒƒ์ด๋‹ค.

์ด๋ฅผ ์œ„ํ•ด CompositeItemProcessor ๊ตฌํ˜„์ฒด๊ฐ€ ์กด์žฌํ•œ๋‹ค.
์ด๋Š” ItemProcessor๊ฐ„์˜ ์ฒด์ด๋‹์„ ์ง€์›ํ•œ๋‹ค.

4.1. Create Composite Processor

  • ProcessorCompositeJobConfiguration.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorCompositeJobConfiguration {

    public static final String JOB_NAME = "processorCompositeBatch";
    public static final String BEAN_PREFIX = JOB_NAME + "_";

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory emf;

    @Value("${chunkSize:1000}")
    private int chunkSize;

    @Bean(JOB_NAME)
    public Job job() {
        return jobBuilderFactory.get(JOB_NAME)
                .preventRestart()
                .start(step())
                .build();
    }

    @Bean(BEAN_PREFIX + "step")
    @JobScope
    public Step step() {
        return stepBuilderFactory.get(BEAN_PREFIX + "step")
                .<Pay, String>chunk(chunkSize)
                .reader(reader())
                .processor(compositeProcessor())
                .writer(writer())
                .build();
    }

    @Bean(BEAN_PREFIX + "reader")    
    public JpaPagingItemReader<Pay> reader() {
        return new JpaPagingItemReaderBuilder<Pay>()
                .name(BEAN_PREFIX+"reader")
                .entityManagerFactory(emf)
                .pageSize(chunkSize)
                .queryString("SELECT t FROM Pay t")
                .build();
    }

    @Bean(BEAN_PREFIX + "processor")
    public CompositeItemProcessor compositeProcessor() {
        List<ItemProcessor> delegates = new ArrayList<>(2);
        delegates.add(processor1());
        delegates.add(processor2());

        CompositeItemProcessor processor = new CompositeItemProcessor<>();

        processor.setDelegates(delegates);

        return processor;
    }

    public ItemProcessor<Pay, String> processor1() {
        return Pay::getTxName;
    }

    public ItemProcessor<String, String> processor2() {
        return name -> "ํŠธ๋žœ์žญ์…˜ ์ด๋ฆ„์€"+ name + "์ž…๋‹ˆ๋‹ค.";
    }

    private ItemWriter<String> writer() {
        return items -> {
            for (String item : items) {
                log.info("Pay Name={}", item);
            }
        };
    }

4.2. Execute

./gradlew build -x test

java -jar .\build\libs\demo-0.0.1-SNAPSHOT.jar --job.name=processorCompositeBatch

4.3. Check

Pay์—์„œ String(txName)์œผ๋กœ ๋ณ€ํ™˜๋˜๊ณ , String์ด ๋˜๋‹ค๋ฅธ String์œผ๋กœ ๋ณ€ํ™˜๋œ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.


๋ชจ๋“  ์†Œ์Šค๋Š” ๊นƒํ—ˆ๋ธŒ์— ์˜ฌ๋ ค๋†“์•˜๋‹ค.
์ฐธ๊ณ ๋งํฌ: jojoldu ๋ธ”๋กœ๊ทธ

0๊ฐœ์˜ ๋Œ“๊ธ€