Spring Batch 삽질기 2탄

Miz·2021년 5월 28일
0

spring-batch

목록 보기
2/2

대부분 스프링 배치의 내용은 '기억보다는 기록을' 블로그를 참고하여 공부하였습니다.
배치에 대해 공부하고자 하시는분은 이 글 보다 아래 블로그를 보는게 훨씬 도움이 됩니다
https://jojoldu.tistory.com/
2탄에서 해결 방법은 guppy님의 블로그를 참고하였습니다.
https://wckhg89.github.io/archivers/springbatch2

1탄 https://velog.io/@miz/Spring-Batch-%EC%82%BD%EC%A7%88%EA%B8%B0-1%ED%83%84
2021-05-29 ConcurrentHashMap 수정

1. 기존상황

  • 결국 필드변수를 통한 동시성이 해결되지 않는 문제점이 발생했다.
  • 그후 guppy님의 블로그를 통해 step간 데이터 공유하는 방법에서 singleton bean을 사용하는 아이디어를 얻었다.
  • Singleton Bean을 사용하면서 얻는 이점은 ExecutionContext를 사용할때 metaDataSchema에 데이터를 저장하면서 발생하는 시간 낭비와 저장공간 낭비를 해결 할 수 있다.

2. 세번째 삽질

@Component
@Slf4j
public class DataShareBean <T> {

    private Map<String, T> shareDataMap;

    public DataShareBean() {
        this.shareDataMap = new ConcurrentHashMap<>();
    }

    public void putData(String key, T data) {
        if(shareDataMap == null) {
            log.debug("map is not initialize");
            return;
        }

        shareDataMap.put(key, data);
    }

    public T getData (String key) {

        if (shareDataMap == null) {
            return null;
        }

        return shareDataMap.get(key);
    }

/*
    public void addData(String key, T data) {
        if(shareDataMap == null) {
            log.debug("map is not initialize");
            return;
        }
        T t = shareDataMap.get(key);
        if(t == null) {
            shareDataMap.put(key,data);
            return;
        }

        shareDataMap.put(key, (T) Long.valueOf((Long) data + (Long) t));
    }
    */
    //제가 concurrentHashMap을 잘못쓰고 있어서 추가적인 수정을 하겠습니다. (1)
    public void addData(String key, T data) {
        shareDataMap.compute(key,(k,v) -> (T) Long.valueOf((Long) data + (Long) v));
    }
}

@Slf4j
@Configuration
@RequiredArgsConstructor
public class PayTotalJobThirdConfiguration {

    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    public static final String JOB_NAME = "PayTotalThirdJob";
    public static final String BEAN_PREFIX = JOB_NAME + "_";

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;
    private final TotalPayRepository totalPayRepository;
    private final DataShareBean<Long> dataShareBean;

    private final static int chunkSize = 1000;

    @Bean(JOB_NAME)
    public Job job() {
	    //제가 concurrentHashMap을 잘못쓰고 있어서 추가적인 수정을 하겠습니다. (2)
        dataShareBean.putData("totalAmount", 0L);
        return jobBuilderFactory.get(JOB_NAME)
                .start(step(null))
                .next(step2(null))
                .build();
    }

    @Bean(BEAN_PREFIX + "step")
    @JobScope
    public Step step(@Value("#{jobParameters[requestDate]}") String requestDate) {
        return stepBuilderFactory.get(BEAN_PREFIX + "step")
                .<Pay, Pay>chunk(chunkSize)
                .reader(reader(null))
                .writer(writer())
                .build();
    }

    @Bean(BEAN_PREFIX + "reader")
    @StepScope
    public JpaPagingItemReader<Pay> reader(@Value("#{jobParameters[requestDate]}") String requestDate) {
        return new JpaPagingItemReaderBuilder<Pay>()
                .name(BEAN_PREFIX + "reader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("select p from Pay p where to_char(tx_date_time,'yyyy-mm-dd') = '" + requestDate + "'")
                .build();
    }

    @Bean(BEAN_PREFIX + "writer")
    @StepScope
    public ItemWriter<Pay> writer() {
        return list -> {
            for(Pay pay : list) {
                log.info("Current = {}", pay);
                dataShareBean.addData("totalAmount", pay.getAmount());
            }
        };
    }

    @Bean(BEAN_PREFIX + "step2")
    @JobScope
    public Step step2(@Value("#{jobParameters[requestDate]}") String requestDate) {
        return stepBuilderFactory.get(BEAN_PREFIX + "step2")
                .tasklet((contribution, chunkContext) -> {

                    TotalPay totalPay = new TotalPay(dataShareBean.getData("totalAmount"),requestDate);

                    totalPayRepository.save(totalPay);

                    return RepeatStatus.FINISHED;
                }).build();
    }


}
  • 여기서는 DataShareBean이라는 객체를 만들고 객체를 빈으로 등록시킨 뒤 Di를 받아와서 싱글톤으로 된 빈을 사용하는 방법으로 접근합니다.
  • Spring Batch 특성상 MVC 처럼 데몬이 올라와서 계속해서 메모리가 쌓이는 형식이 아닌 한번 실행되고 꺼지는 방식으로 적용되기 때문에 따로 문제가 생기지는 않는다고 생각했습니다.
  • DataShareBean에서는 ConcurrentHashMap을 통해서 스레드 세이프하게 생성하였습니다.
    - AtomicLong을 통해 하여도 문제는 없지만, 추후 확장성을 고려하여 HashMap을 사용하였습니다.
  1. 첫번째 Step에서 Reader를 통해 읽은 정보를 Writer에서 DataSourceBean에 있는 특정 값을 계속해서 업데이트 해준다.
  2. 두번쪠 Step에서 DataSourceBean에 있는 값을 가지고 와 TotalPay를 생성한뒤 저장한다.

추가수정

  1. ConcurrentHashMap 수정
    • 그냥 무작정 concurrentHashMap을 사용할 경우 Thread Safe 한지 알았지만 테스트 과정에서 이게 아니란걸 알게되었습니다.
    • compute() 함수를 통해 thread-safe하게 동작
    • job 시작하기전에 dataSourceBean 초기화 추가
    • 이렇게 쓰기 싫으신 분들은 AtomicLong을 통해 작업하겠습니다.

결론

  • 이게 과연 Best Practice일까에 대한 고민이 더 필요하다
  • 혹시라도 이 방법으로 하실분들은 Best Practice라고 생각하지 마시고 다른 방법도 한번 생각해보시고 의견을 주시면 감사하겠습니다!
  • 다음에는 병렬처리 적용해보겠습니다.
profile
2년차 백엔드 개발자

0개의 댓글