스프링배치 데이터처리하기

박태훈 ·2024년 11월 22일

스프링

목록 보기
1/1

스프링배치?


  • 스프링 프레임워크에서 배치작업을 안정적이고 효율적으로 수행할수 있도록 지원하는 라이브러리.
  • 배치작업은 대량의 데이터를 처리, 주기적이고 반복적인 작업을 실행하는 데 사용된다
  • 안정적이고 효율적으로 처리하기 위해 스프링배치를 이용한다.

배치작업이란?

  • 주로 데이터 처리를 자동화하기 위해 사용되는 작업
  • 대량의 데이터를 일괄처리 하는걸 목표로한다.
  • 정기적인 갱신, 대규모 데이터전송, 보고서 생성 등 다양한 용도로 사용된다.

스프링배치 사용이유

주식모의투자 프로젝트에서 사용자의 수익률을 구하는 로직을 작성해야 하는 상황에 맞딱드렸다.
수익률은 Kospi 장이 마감되는 16시에 진행된다. 사용자의 지갑데이터에 존재하는 자산을 비교분석해 일괄적으로 수익률을 집계 해야하기 때문에 스프링배치를 이용하기로 결정했다.

일일, 주간, 월간 수익률을 정산해야하기때문에 스프링배치와 스케쥴러를 통해 로직이 실행되도록 하기로 결정했다.


스프링배치 용어 정리

  • 스프링배치는 기본적으로 Job 단위로 움직인다.
  • 배치를 사용하기전 기본적인 용어는 알아야 사용할수있다.
  • Job : 스프링배치의 기본 처리 단위
  • JobLancher : Job과 JobParameters 를 사용해 Job을 실행하는 객체
  • Step : 배치처리를 정의하고 제어하는데 필요한 모든 정보가 들어 있는 객체
  • JobRepository : 배치 처리 정보를 담고 있는 메커니즘
  • JobInstance : 배치에서 Job이 실행될 때 하나의 Job 실행단위
  • JobExcution : JobInstance 실행 시도에 대한 객체
  • JobBuilderFactory : Job을 실행하기 위한 객체 JobBuilder를 생성할수 있다.
  • JobBuilder : Step을추가해 가장 기본이 되는 SimpleJobBuilder를 생성한다.
  • Step : 처리해야 할 작업 내용 Job은 하나 이상의 Step으로 이루어지며 각 Step은 순차적으로 처리된다.
  • StepExcution : Step의 실행 시도에 대한 객체
  • ExecutionContext : Step 혹은 Job의 실행 중 발생하는 데이터를 저장하고 공유함
  • JobLancher : Job과 JobParameters를 사용해 Job을 실행, 생명주기를 관리한다.
  • ItemReadear : 배치 작업에서 처리할 아이템을 읽어오기 위한 인터페이스
  • ItemProcessor : ItemReader로부터 읽어온 데이터를 처리
  • ItemWriter : 배치 처리된 데이터를 최종적으로 DB에 저장
  • Chunk : 한번에 처리될 데이터의 수를 의미 해당 단위로 트랜잭션을 수행한다.

구현내용

개발환경

  • Java - 17
  • SpringBoot - 3.2.5
  • SpringBatcch - 5 버전이상
  • ORM - JPA

설정

//spring batch
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    testImplementation 'org.springframework.batch:spring-batch-test'
  • Gradle 파일에 스프링배치 Dependency를 설정해줍니다.
spring:
  application:
    name: rank-server
  batch:
    jdbc:
        initialize-schema: always
  • yml 파일에 batch작업시 발생할 데이터를 저장할 메타테이블 생성 설정을 해줍니다.
  • 4버전 이하에서는 메타테이블을 생성안해도 배치가 작동됩니다.
  • 5버전부터는 메타테이블이 존재해야만 배치작업이 정상적으로 동작합니다.
  • 만약 yml 설정을 제대로했지만 메타테이블이 생성되지 않을경우 쿼리문을 이용해 테이블을 생성해주면됩니다.
    메타테이블생성쿼리

로직설명

  • 지갑 DB의 내용을 FeignClient를 이용해 랭킹DB로 불러온다.
  • 그후 불러온 사용자의 자산 데이터를 정산해 수익률을 계산한다.
  • 계산된 수익률을 기준으로 랭킹 컬럼을 update 시킨후 랭킹 정산을 한다.
  • 해당 프로젝트는 MSA 아키텍처 기반이기 때문에 지갑DB, 랭킹 DB가 각기다른 서버에 존재한다.
  • 그래서 FeignClient를 이용해 지갑 데이터를 불러온다
  • 배치작업 시간동안 배치가 진행중인 서버에 트랜잭션 오류들을 방지하기 위해 API 호출은 진행되지 않는다.

배치로직

 @Bean
    public Step dailyRankingStep(JobRepository jobRepository,
            PlatformTransactionManager transactionManager) {

        return new StepBuilder("dailyRankingStep", jobRepository)
                .<DailyWallet, DailyRankingDto>chunk(10, transactionManager)
                .reader(dailyRankingReader())
                .processor(dailyRankingProcessor())
                .writer(dailyRankingWriter())
                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(100)
                .listener(customSkipListner)
                .retry(Exception.class)
                .retryLimit(3)
                .listener(customSkipListner)
                .build();
    }

Step 설정 부분이다.
StepBuilder를 통해 Step의 세부사항들을 설정했다.
chunk단위는 10으로 설정하였고, reader, processor, writer를 선언해줬다.
.faultTolerant를 활성화 시켜 skip, retry 설정을 통해 예외처리를 하였다.
DailyWallet은 지갑서버로 부터 받아온 데이터들이 JPA에 매핑되어있는 엔티티 클래스이다.

 @Bean
    public ItemReader<DailyWallet> dailyRankingReader() {
        return new RepositoryItemReaderBuilder<DailyWallet>()
                .name("readWalletInfo")
                .repository(dailyWalletRepository)
                .methodName("findAll")
                .pageSize(10)
                .sorts(Collections.singletonMap("uuid", Sort.Direction.ASC))
                .build();
    }

Reader메소드이다.
DailyWallet엔티티를 페이징해서 읽어온다.
dailyWalletRepository에 선언한 findAll 메소드를 통해 모든 지갑데이터를 읽어온다.
여기서 uuid를 기준으로 오름차순 정렬을 진행한다.
정렬을 진행하는 이유는 신규회원과 기존 데이터에 존재하는 회원 사이에 순서를 맞추기 위해서 정렬작업을 진행했다.

    @Bean //수익률 구하는 processor
    public ItemProcessor<DailyWallet, DailyRankingDto> dailyRankingProcessor() {
        return dailyWallet -> {
            if (dailyWallet.getYesterdayWon() == null && dailyWallet.getTodayWon() == null) {
                throw new CustomException(BaseResponseCode.NO_DATA);
            }
            double profit = ((double) (dailyWallet.getTodayWon()  //수익률 구하는 연산
>                     - dailyWallet.getYesterdayWon()) / dailyWallet.getYesterdayWon()) * 100;

            //수익률 소숫점 3자리로 제한
            BigDecimal roundedProfit =
                    new BigDecimal(profit).setScale(3, RoundingMode.HALF_UP);

            return DailyRankingDto
                    .builder()
                    .uuid(dailyWallet.getUuid())
                    .todayWon(dailyWallet.getTodayWon())
                    .profit(roundedProfit.doubleValue())
                    .nickname(dailyWallet.getNickname())
                    .build();
        };
    }
    

Processor메소드이다.
DailyWaller 데이터를 기반으로 수익률을 계산하고 결과를 DTO 객체로 변환한다.
전일 금액과 금일 금액을 기준으로 수익률을 계산한다.
profit을 이용해 소숫점 3자리까지 반올림 처리한후 DailyRankingDTO객체로 매핑.
만약 전일과 금일 금액이 null이면 에러Exception을 터트린다.
수익률 연산이 진행되지 않기때문이다.

 @Bean
    public ItemWriter<DailyRankingDto> dailyRankingWriter() {
        return items -> {
            for (DailyRankingDto item : items) {

                if(dailyRankingRepository.existsByUuid(item.getUuid())){
                    dailyRankingQueryDslmp.updateDailyRanking(item);
                }else {
                    DailyRanking dailyRanking = DailyRanking.builder()
                        .uuid(item.getUuid())
                        .won(item.getTodayWon())
                        .profit(item.getProfit())
                            .nickname(item.getNickname()).build();
                        dailyRankingRepository.save(dailyRanking);
                }
            }
            log.info("save dailyRanking");
        };
    }

Writer 메소드이다.
수익률이 계산된 데이터를 데이터베이스에 저장하거나 Update한다.
uuid를 이용해 DailyWallet 엔티티에 데이터가 존재하는지 확인한후, 데이터가 존재하면 Update 아니면 데이터를 저장한다.
배치작업을 통해 수익률을 계산해 DB에 저장한다.
배치 작업이 종료된후 정산된 수익률을 이용해 순위를 다시 선정하면서 순위집계 작업은 마무리가된다.
스프링배치의 개념과 사용법을 정리한 글이기때문에 다른내용들은 다루지 않았습니다.

 @Scheduled(cron = "0 50 15 ? * MON-FRI") //일간 수익률 집계

    public void dailyRankingBatchStart()
            throws Exception{

        log.info("start dailyRankingBatch");
        jobLauncher.run((Job) dailyRankingJobConfig.dailyRankingJob(
                jobRepository,platformTransactionManager), new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
                .toJobParameters());

    }
  • Job을 실행시키기 메소드이다.
  • Scheduled 어노테이션을 이용해 Job이 실행되는 시간을 설정했다.
  • Job을 실행시키기 위해 JobLancher를 이용했다.
  • 배치작업시 JobParameters를 생성해 전달한다.
  • 배치는 JobParameter가 고유해야 같은 Job을 중복 실행하지 않기 때문임.

전체코드

@Slf4j
@Configuration
@RequiredArgsConstructor
public class DailyRankingJobConfig {

    private final DailyWalletRepository dailyWalletRepository;
    private final DailyRankingRepository dailyRankingRepository;
    private final DailyRankingQueryDslmp dailyRankingQueryDslmp;


    @Autowired
    private CustomSkipListner customSkipListner;

    @Bean //일일랭킹 집계
    public Job dailyRankingJob(JobRepository jobRepository,
            PlatformTransactionManager transactionManager) {

        return new JobBuilder("dailyRankingJob", jobRepository)
                .start(dailyRankingStep(jobRepository, transactionManager))
                .build();

    }

    @Bean
    public Step dailyRankingStep(JobRepository jobRepository,
            PlatformTransactionManager transactionManager) {

        return new StepBuilder("dailyRankingStep", jobRepository)
                .<DailyWallet, DailyRankingDto>chunk(10, transactionManager)
                .reader(dailyRankingReader())
                .processor(dailyRankingProcessor())
                .writer(dailyRankingWriter())
                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(100)
                .listener(customSkipListner)
                .retry(Exception.class)
                .retryLimit(3)
                .listener(customSkipListner)
                .build();
    }

    @Bean
    public ItemReader<DailyWallet> dailyRankingReader() {
        return new RepositoryItemReaderBuilder<DailyWallet>()
                .name("readWalletInfo")
                .repository(dailyWalletRepository)
                .methodName("findAll")
                .pageSize(10)
                .sorts(Collections.singletonMap("uuid", Sort.Direction.ASC))
                .build();
    }

    @Bean //수익률 구하는 processor
    public ItemProcessor<DailyWallet, DailyRankingDto> dailyRankingProcessor() {
        return dailyWallet -> {
            if (dailyWallet.getYesterdayWon() == null && dailyWallet.getTodayWon() == null) {
                throw new CustomException(BaseResponseCode.NO_DATA);
            }
            double profit = ((double) (dailyWallet.getTodayWon()  //수익률 구하는 연산
                    - dailyWallet.getYesterdayWon()) / dailyWallet.getYesterdayWon()) * 100;

            //수익률 소숫점 3자리로 제한
            BigDecimal roundedProfit =
                    new BigDecimal(profit).setScale(3, RoundingMode.HALF_UP);

            return DailyRankingDto
                    .builder()
                    .uuid(dailyWallet.getUuid())
                    .todayWon(dailyWallet.getTodayWon())
                    .profit(roundedProfit.doubleValue())
                    .nickname(dailyWallet.getNickname())
                    .build();
        };
    }

    @Bean
    public ItemWriter<DailyRankingDto> dailyRankingWriter() {
        return items -> {
            for (DailyRankingDto item : items) {

                if(dailyRankingRepository.existsByUuid(item.getUuid())){
                    dailyRankingQueryDslmp.updateDailyRanking(item);
                }else {
                    DailyRanking dailyRanking = DailyRanking.builder()
                        .uuid(item.getUuid())
                        .won(item.getTodayWon())
                        .profit(item.getProfit())
                            .nickname(item.getNickname()).build();
                        dailyRankingRepository.save(dailyRanking);
                }
            }
            log.info("save dailyRanking");
        };
    }

참고자료

https://dkswnkk.tistory.com/707
https://velog.io/@clevekim/Spring-Batch%EB%9E%80-%EB%AC%B4%EC%97%87%EC%9D%B8%EA%B0%80

profile
아직말하는감자개발자

0개의 댓글