Spring Batch 성능 개선기

박우영·2023년 6월 19일
0

프로젝트

목록 보기
4/7

개요

Spring Batch 는 기본적으로 동기방식으로 동작합니다. 확실히 정확도는 올라가고 트랜잭션 처리가 간편하지만 속도가느리다는 단점이 있습니다. 지금 프로젝트에서 크롤링을 여러곳을 하고 정보를 많이 가져오고자 하다보니 동기로 한번 실행될때 시간이 많이 소모되고 서비스 확장을 생각했을때 더 많은곳을 크롤링 하고 더 많은 정보를 가져오려면 비동기 처리를 할 필요가 있다 판단했습니다.

또한 외부에서 데이터를 가져와 우리 DB에 저장하는 방식으로 구현되어있어 고유 ID 값으로 식별하지 않는다면 동일한 데이터를 중복으로 가져옵니다. 중복제거를 하려면 DB 에 DISTINCT 를 활용하는 방법도 있지만 저는 중복데이터가 저장되는 것 조차 막고싶었습니다.

Entity

@Entity
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor(access = PROTECTED)
public class JobStatistic {
    @Id @GeneratedValue(strategy = IDENTITY)
    private Long id;

    private String company;

    private String subject;

    @Column(unique = true)
    private String url;

    private String sector;

    private String startDate;

    private String deadLine;

    private int career; // 년차 신입은 default 0

    private int sectorCode;

동기 방식


먼저 동기방식입니다. Job 은 다음과 같이 구성되어 있습니다. 사람인 api 를 받아오고 원티드 사이트에서 각 분야별 스텝을 나누고 스텝별로 경력을 반복해서 가져옵니다.

Job

    @Bean
    public Job job1(JobRepository jobRepository) {
        return new JobBuilder("job1", jobRepository)
                .start(step1(jobRepository)) // 사람인
                .next(step2(jobRepository)) // 원티드 백엔드
                .next(step3(jobRepository)) // 원티드 프론트
                .next(step4(jobRepository)) // 원티드 풀스택
                .next(step5(jobRepository)) // 중복된 값 필터하고 db저장
//                .start(chunkSaram(jobRepository)) // 청크
                .next(step6(jobRepository)) // 기존 값들 초기화
                .next(step7(jobRepository)) // 데이터 삭제
                .build();
    }

Spring Batch Job

DB 작업 시작

DB 작업 끝

동기방식 결과

  • Job: 1시간 35분
  • DB: 1분 26초

비동기 방식


먼저 DB에 Insert Into Duplicate Key Update 설정을 해줬습니다.

Repository

    @Modifying
    @Query(value = """
            INSERT INTO job_statistic(
                  company,
                  subject,
                  url,
                  sector,
                  start_date,
                  dead_line,
                  career,
                  sector_code
                  )
                  VALUES(
                  :#{#dto.company},
                  :#{#dto.subject},
                  :#{#dto.url},
                  :#{#dto.sector},
                  :#{#dto.createDate},
                  :#{#dto.deadLine},
                  :#{#dto.career},
                  :#{#dto.sectorCode}
                  )
                  ON DUPLICATE KEY UPDATE
                    company = :#{#dto.company},
                    sector = :#{#dto.sector},
                    career = :#{#dto.career},
                 	subject = :#{#dto.subject}
        """, nativeQuery = true)
    void upsert(@Param("dto") JobResponseDto dto);

@Query 를 사용해 CRUD 를할때 @Modifying 어노테이션을 붙여줘야합니다.
(R 은 예외지만 보기쉽게 하기위해 포함하였습니다.)

run환경 비동기처리 실행(실패)

@Configuration
@RequiredArgsConstructor
@Slf4j
@EnableAsync
public class BatchScheduler {
    private final JobLauncher jobLauncher;
    private final BatchConfiguration batchConfiguration;
    private final JobRepository jobRepository;


    @Scheduled(cron = "${scheduler.cron.job}")
    @Async
    public void runJob() {
        log.debug("스케줄링 하는중");
        try {
            jobLauncher.run(batchConfiguration.job1(jobRepository),
                    new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
        } catch (JobParametersInvalidException | JobExecutionAlreadyRunningException | JobRestartException |
                 JobInstanceAlreadyCompleteException | UnexpectedRollbackException e) {
            log.error(e.getMessage());
        }
    }

    @Scheduled(cron = "${scheduler.cron.job}")
    @Async
    public void wontedBack() {
        log.debug("스케줄링 하는중");
        try {
            jobLauncher.run(batchConfiguration.job2(jobRepository),
                    new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
        } catch (JobParametersInvalidException | JobExecutionAlreadyRunningException | JobRestartException |
                 JobInstanceAlreadyCompleteException | UnexpectedRollbackException e) {
            log.error(e.getMessage());
        }
    }
    @Scheduled(cron = "${scheduler.cron.job}")
    @Async
    public void wontedFront() {
        log.debug("스케줄링 하는중");
        try {
            jobLauncher.run(batchConfiguration.job3(jobRepository),
                    new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
        } catch (JobParametersInvalidException | JobExecutionAlreadyRunningException | JobRestartException |
                 JobInstanceAlreadyCompleteException | UnexpectedRollbackException e) {
            log.error(e.getMessage());
        }
    }

    @Scheduled(cron = "${scheduler.cron.job}")
    @Async
    public void wontedFullStack() {
        log.debug("스케줄링 하는중");
        try {
            jobLauncher.run(batchConfiguration.job4(jobRepository),
                    new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
        } catch (JobParametersInvalidException | JobExecutionAlreadyRunningException | JobRestartException |
                 JobInstanceAlreadyCompleteException | UnexpectedRollbackException e) {
            log.error(e.getMessage());
        }
    }

Exception 발생

ERROR 2023-06-26 16:02:00,043 [task-7] [SimpleAsyncUncaughtExceptionHandler :: handleUncaughtException :: 39] - Unexpected exception occurred invoking async method: public void com.goodjob.batch.BatchScheduler.wontedBack()
org.springframework.dao.CannotAcquireLockException: PreparedStatementCallback; SQL [INSERT INTO BATCH_JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION)
	VALUES (?, ?, ?, ?)
]; Deadlock found when trying to get lock; try restarting transaction

에러로그 일부입니다. Batch가 실행되는데 메타데이터 테이블 관련하여 DB락이 발생하였습니다.

parallel

Spring batch 에서 비동기작업은 대표적으로 parallel 과 multi thread 방식이 있습니다.
저는 기존의 테스크렛 으로 작업했던것으로 parallel 방식으로 하기로했습니다.

기존 계획은 공고별로 job으로 나누고 job 별로 쓰레드풀을 생성하여 비동기 처리 하는것 이었지만 돌아가기까진 하지만 제 개발 pc인 맥북에서도 소음이 발생할 정도로 성능이 낮아졌었고 이를 해결하기 위해선 많은 시간이 필요할것같습니다. 따라서 기존의 방식에서 병렬처리를 추가하는 방식으로 진행하였습니다. 추후에 다른 작업들을 마치고 시간이 남는다면 추가 리팩토링을 진행할 예정입니다.

Job

    @Bean
    public Job job1(JobRepository jobRepository) {
        Flow saramin = new FlowBuilder<SimpleFlow>("saramin")
                .start(step1(jobRepository))
                .build();
        Flow wontedBack = new FlowBuilder<SimpleFlow>("wontedBack")
                .start(step2(jobRepository))
                .build();

        Flow wontedFront = new FlowBuilder<SimpleFlow>("wontedFront")
                .start(step3(jobRepository))
                .build();
        Flow wontedFullStack = new FlowBuilder<SimpleFlow>("wontedFullStack")
                .start(step4(jobRepository))
                .build();
        Flow db = new FlowBuilder<SimpleFlow>("db작업")
                .start(step5(jobRepository))
                .next(step6(jobRepository))
                .next(step7(jobRepository))
                .build();


        return new JobBuilder("job1", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(saramin)
                .split(taskExecutor()).add(wontedBack, wontedFront, wontedFullStack)
                .next(db)
                .end()
                .build();
    }

동기 vs 비동기 시간비교


데이터처리량은 약간의 오차가 있습니다.(크롤링,api 데이터가 합쳐져 있기때문)
하지만 실행코드는 동일하기때문에 동일한 환경이라고 가정하고 비교하겠습니다.

1회 데이터 처리량: 2000~3000개

동기

약 1시간30분~2시간 정도 소요

비동기

1차 parallel적용


약 20분 소요

2차 upsert까지 적용

parallel 만 적용했을때보다도 훨씬 빨라졌습니다. 병렬처리를 하기위해선 중복데이터 검증이 꼭 필요한데 기존의 중복데이터를 was 에서 검증 했었지만 이를 db와 분담하여 성능 개선을 하였습니다.

약 15분 소요

회고


결과적으로는 4배이상의 시간단축을 하며 성능을 개선하긴 했지만 많이 아쉬운 과정과 결과였습니다. map api 를 활용해서 좀 더 디테일한 정보를 제공해주는것이 1차 목표이기 때문에 더 리팩토링을 진행하기엔 시간이 없지만 작업을 마치고 시간이 남는다면, 아니면 프로젝트가 끝나더라도 추후에 리팩토링을 진행하며 Spring batch 및 비동기 처리의 개념을 확실하게 잡아 가고자 합니다.

0개의 댓글