Spring Batch 는 기본적으로 동기방식으로 동작합니다. 확실히 정확도는 올라가고 트랜잭션 처리가 간편하지만 속도가느리다는 단점이 있습니다. 지금 프로젝트에서 크롤링을 여러곳을 하고 정보를 많이 가져오고자 하다보니 동기로 한번 실행될때 시간이 많이 소모되고 서비스 확장을 생각했을때 더 많은곳을 크롤링 하고 더 많은 정보를 가져오려면 비동기 처리를 할 필요가 있다 판단했습니다.
또한 외부에서 데이터를 가져와 우리 DB에 저장하는 방식으로 구현되어있어 고유 ID 값으로 식별하지 않는다면 동일한 데이터를 중복으로 가져옵니다. 중복제거를 하려면 DB 에 DISTINCT 를 활용하는 방법도 있지만 저는 중복데이터가 저장되는 것 조차 막고싶었습니다.
@Entity
@SuperBuilder(toBuilder = true)
@Getter
@AllArgsConstructor
@NoArgsConstructor(access = PROTECTED)
public class JobStatistic {
@Id @GeneratedValue(strategy = IDENTITY)
private Long id;
private String company;
private String subject;
@Column(unique = true)
private String url;
private String sector;
private String startDate;
private String deadLine;
private int career; // 년차 신입은 default 0
private int sectorCode;
먼저 동기방식입니다. Job 은 다음과 같이 구성되어 있습니다. 사람인 api 를 받아오고 원티드 사이트에서 각 분야별 스텝을 나누고 스텝별로 경력을 반복해서 가져옵니다.
@Bean
public Job job1(JobRepository jobRepository) {
return new JobBuilder("job1", jobRepository)
.start(step1(jobRepository)) // 사람인
.next(step2(jobRepository)) // 원티드 백엔드
.next(step3(jobRepository)) // 원티드 프론트
.next(step4(jobRepository)) // 원티드 풀스택
.next(step5(jobRepository)) // 중복된 값 필터하고 db저장
// .start(chunkSaram(jobRepository)) // 청크
.next(step6(jobRepository)) // 기존 값들 초기화
.next(step7(jobRepository)) // 데이터 삭제
.build();
}
먼저 DB에 Insert Into Duplicate Key Update 설정을 해줬습니다.
@Modifying
@Query(value = """
INSERT INTO job_statistic(
company,
subject,
url,
sector,
start_date,
dead_line,
career,
sector_code
)
VALUES(
:#{#dto.company},
:#{#dto.subject},
:#{#dto.url},
:#{#dto.sector},
:#{#dto.createDate},
:#{#dto.deadLine},
:#{#dto.career},
:#{#dto.sectorCode}
)
ON DUPLICATE KEY UPDATE
company = :#{#dto.company},
sector = :#{#dto.sector},
career = :#{#dto.career},
subject = :#{#dto.subject}
""", nativeQuery = true)
void upsert(@Param("dto") JobResponseDto dto);
@Query 를 사용해 CRUD 를할때 @Modifying 어노테이션을 붙여줘야합니다.
(R 은 예외지만 보기쉽게 하기위해 포함하였습니다.)
@Configuration
@RequiredArgsConstructor
@Slf4j
@EnableAsync
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final BatchConfiguration batchConfiguration;
private final JobRepository jobRepository;
@Scheduled(cron = "${scheduler.cron.job}")
@Async
public void runJob() {
log.debug("스케줄링 하는중");
try {
jobLauncher.run(batchConfiguration.job1(jobRepository),
new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
} catch (JobParametersInvalidException | JobExecutionAlreadyRunningException | JobRestartException |
JobInstanceAlreadyCompleteException | UnexpectedRollbackException e) {
log.error(e.getMessage());
}
}
@Scheduled(cron = "${scheduler.cron.job}")
@Async
public void wontedBack() {
log.debug("스케줄링 하는중");
try {
jobLauncher.run(batchConfiguration.job2(jobRepository),
new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
} catch (JobParametersInvalidException | JobExecutionAlreadyRunningException | JobRestartException |
JobInstanceAlreadyCompleteException | UnexpectedRollbackException e) {
log.error(e.getMessage());
}
}
@Scheduled(cron = "${scheduler.cron.job}")
@Async
public void wontedFront() {
log.debug("스케줄링 하는중");
try {
jobLauncher.run(batchConfiguration.job3(jobRepository),
new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
} catch (JobParametersInvalidException | JobExecutionAlreadyRunningException | JobRestartException |
JobInstanceAlreadyCompleteException | UnexpectedRollbackException e) {
log.error(e.getMessage());
}
}
@Scheduled(cron = "${scheduler.cron.job}")
@Async
public void wontedFullStack() {
log.debug("스케줄링 하는중");
try {
jobLauncher.run(batchConfiguration.job4(jobRepository),
new JobParametersBuilder().addDate("date", new Date()).toJobParameters());
} catch (JobParametersInvalidException | JobExecutionAlreadyRunningException | JobRestartException |
JobInstanceAlreadyCompleteException | UnexpectedRollbackException e) {
log.error(e.getMessage());
}
}
ERROR 2023-06-26 16:02:00,043 [task-7] [SimpleAsyncUncaughtExceptionHandler :: handleUncaughtException :: 39] - Unexpected exception occurred invoking async method: public void com.goodjob.batch.BatchScheduler.wontedBack()
org.springframework.dao.CannotAcquireLockException: PreparedStatementCallback; SQL [INSERT INTO BATCH_JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION)
VALUES (?, ?, ?, ?)
]; Deadlock found when trying to get lock; try restarting transaction
에러로그 일부입니다. Batch가 실행되는데 메타데이터 테이블 관련하여 DB락이 발생하였습니다.
Spring batch 에서 비동기작업은 대표적으로 parallel 과 multi thread 방식이 있습니다.
저는 기존의 테스크렛 으로 작업했던것으로 parallel 방식으로 하기로했습니다.
기존 계획은 공고별로 job으로 나누고 job 별로 쓰레드풀을 생성하여 비동기 처리 하는것 이었지만 돌아가기까진 하지만 제 개발 pc인 맥북에서도 소음이 발생할 정도로 성능이 낮아졌었고 이를 해결하기 위해선 많은 시간이 필요할것같습니다. 따라서 기존의 방식에서 병렬처리를 추가하는 방식으로 진행하였습니다. 추후에 다른 작업들을 마치고 시간이 남는다면 추가 리팩토링을 진행할 예정입니다.
@Bean
public Job job1(JobRepository jobRepository) {
Flow saramin = new FlowBuilder<SimpleFlow>("saramin")
.start(step1(jobRepository))
.build();
Flow wontedBack = new FlowBuilder<SimpleFlow>("wontedBack")
.start(step2(jobRepository))
.build();
Flow wontedFront = new FlowBuilder<SimpleFlow>("wontedFront")
.start(step3(jobRepository))
.build();
Flow wontedFullStack = new FlowBuilder<SimpleFlow>("wontedFullStack")
.start(step4(jobRepository))
.build();
Flow db = new FlowBuilder<SimpleFlow>("db작업")
.start(step5(jobRepository))
.next(step6(jobRepository))
.next(step7(jobRepository))
.build();
return new JobBuilder("job1", jobRepository)
.incrementer(new RunIdIncrementer())
.start(saramin)
.split(taskExecutor()).add(wontedBack, wontedFront, wontedFullStack)
.next(db)
.end()
.build();
}
데이터처리량은 약간의 오차가 있습니다.(크롤링,api 데이터가 합쳐져 있기때문)
하지만 실행코드는 동일하기때문에 동일한 환경이라고 가정하고 비교하겠습니다.
1회 데이터 처리량: 2000~3000개
약 1시간30분~2시간 정도 소요
약 20분 소요
parallel 만 적용했을때보다도 훨씬 빨라졌습니다. 병렬처리를 하기위해선 중복데이터 검증이 꼭 필요한데 기존의 중복데이터를 was 에서 검증 했었지만 이를 db와 분담하여 성능 개선을 하였습니다.
약 15분 소요
결과적으로는 4배이상의 시간단축을 하며 성능을 개선하긴 했지만 많이 아쉬운 과정과 결과였습니다. map api 를 활용해서 좀 더 디테일한 정보를 제공해주는것이 1차 목표이기 때문에 더 리팩토링을 진행하기엔 시간이 없지만 작업을 마치고 시간이 남는다면, 아니면 프로젝트가 끝나더라도 추후에 리팩토링을 진행하며 Spring batch 및 비동기 처리의 개념을 확실하게 잡아 가고자 합니다.