https://velog.io/@alpahexia/MapBook-%ED%8C%8C%EC%9D%BC-%EB%8B%A4%EC%9A%B4%EB%A1%9C%EB%93%9C-%EC%9E%90%EB%8F%99%ED%99%94
위 링크는 기존에 존재하던 대출 횟수 합산을 위한 전국 도서관 장서 목록 File을 다운로드 자동화하는 과정에 대한 글이다.
매달 1억 2천만 건의 데이터를 통해 대출 횟수 최신화, 새롭게 업데이트 되야하는 도서 상세 데이터, 구조화된 로그 데이터에 대한 후처리 등 주기적으로 이뤄져야 하지만 대단히 번거로운 작업들이 있다. 이 작업을 수작업으로 하는 게 너무 비효율적이라고 생각이 들었다.
코드의 복잡성 증가.
- 방치되는 서비스로 되지 않기 위해
아래는 해당 프로세스를 통합 테스트를 했을 때의 Batch_Step_exuecution 테이블의 모습니다.
두개의 Fail이 있는데, 첫번째는 Required Update Book Table에서 Not_found라는 칼럼은 boolean 타입인데, 이때 default 값이 falut로 설정되고 nullable이였어야 했는데 설정이 잘못돼서 실패
두번째 실패는 모든 작업이 완료 후, 작업을 필요한 File들을 삭제하는 작업에서 File 하나를 열어놓은 상태였어서 시스템 에러.
총 소요시간은 1시간 15분 가량 걸렸다.
역시나 DownLoad 단계에서 가장 많은 시간이 소요가 됐다.
현재 병렬 스트림을 사용함으로써 시간을 단축시키긴 했지만 보다 나은 방법을 찾아봐야 한다.
두번째로 오래 걸린 Step은 DB에 입력하는 단계로 UpdateStep이다.
병렬 Step을 통해 Chunk Size를 10만을 기준으로 작업하고 있지만, 이 Chunck Size를 조절해서 최적의 Size를 찾아봐야겠다.
아래는 DownLoad Step에서 Java의 병렬 스트림을 사용하지 않았을 때와 했을 때의 소요 시간 차이를 보여 준다.
로컬 노트북의 사양은 아래와 같다. SSD 231GB이다.
Aggregating Step
UpdateStep
Aggregating Step이 Map을 통해서 ISBN를 Key로 Loan_Cnt을 합산하는 과정에서 map의 다량의 ISBN과 Loan_cnt 데이터가 포함돼 메모리의 양이 급격하게 증가할 것이란 건 예상 했었지만, DB와 연결하는 UpdateStep에서 CPU와 Memory 사용량이 더 높을 것이라고는 생각하지 못했다.
public class LibraryCatalogExecutor {
private final LibraryCatalogDownloader libraryCatalogDownloader;
// targetDate "(2023년 06월)"
@BatchLogging
public void executeProcess(Path input, List<ExecutionStep> executionSteps,
ExecutionStep... skipSteps) {
AtomicReference<Path> currentPath = new AtomicReference<>(input);
List<ExecutionStep> skipList = Arrays.stream(skipSteps).toList();
List<Path> completePath = new ArrayList<>();
executionSteps.stream()
.filter(step -> !skipList.contains(step))
.forEach(executionStep -> {
try {
Path outPutPath = executionStep.execute(currentPath.get());
if (!currentPath.get().subpath(0, 2).toString().equals("pipe/endStep")) {
completePath.add(currentPath.get());
}
currentPath.set(outPutPath);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
completePath.forEach(this::clearDirectory);
}
Executor에게 Step 목록을 전달하기 위해서 Builder 패턴을 사용 했다. StepBuilder의 코드는 아래와 같이 간단하다.
public class StepBuilder {
private List<ExecutionStep> executionSteps;
public StepBuilder start(ExecutionStep step){
if(executionSteps == null){
executionSteps = new ArrayList<>();
executionSteps.add(step);
}
return this;
}
public StepBuilder next(ExecutionStep nextStep){
if(executionSteps == null){
throw new IllegalArgumentException();
}
executionSteps.add(nextStep);
return this;
}
public List<ExecutionStep> end(){
return this.executionSteps;
}
}
public interface ExecutionStep {
Path execute(Path input) throws IOException;
}
어찌보면 Spring batch와 스타일이 비슷하다. Spring Batch를 학습하며 참고해서 기존에 복잡 했던 Executor Class를 Step과 StepBuilder를 통해 리팩토링 했다.
아래는 대출 횟수 최신화 Batch Config 코드의 일부분이다. Simple Job으로 구성되며, downloadStep부터 endAggregateStep까지는 기존에 사용하던 자바 I/O 기반의 코드들을 가져다가 활용해서 작업하고 있다. 그 이유는 여러개의 File과 대용량 데이터를 Spring Batch의 청크 기반 Task로 작업하는데에 아직 학습이 부족해 적용하지 못했다.
다만 그 다음부터 단계 Chunck 기반으로 Spring Batch가 제공하는 ItemReader와 ItemWriter를 사용해서 작업하고 있다.
public class LoanCntUpdateBatchConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final LibraryCatalogDownloader libraryCatalogDownloader;
private final LibraryCatalogWriter libraryCatalogWriter;
private final DownLoadFileClearTask downLoadFileClearTask;
private static final int GROUP_SIZE = 100;
@Bean
public Job libraryCatalogBatch() {
return jobBuilderFactory.get("updateLoanCntJob")
.start(downloadStep(null)) // Crawler와 조합해 File Download
.next(normalizeStep()) // 불필요한 데이터 제거. noraml format에 맞게 data cleansing
.next(aggregateStep())// ISBN별로 대출 횟수를 집계
.next(mergeStep()) // 합산 작업이 완료된 파일 하나로 합치기
.next(endAggregateStep()) //합쳐진 파일을 대상으로 다시 대출 횟수 집계
.next(updateToBookStep()) // DB와 연결하여 대출 횟수 update 및 update 필요한 도서 추가
.next(fileClearStep()) // 작업 완료 후 다운로드 및 작업 File 삭제
.build();
}