동적 스케줄링을 통한 캐시 Write back으로 DB 통신 횟수 줄이기 🔥

초록·2023년 11월 23일
1
post-thumbnail

문제

이 앱으로 한 번 담배 검수를 시작하면 1-2초에 1번씩 업데이트 요청을 보내게 됩니다. 그 때마다 DB에 반영한다면 응답 시간도 길어지고, 많은 사람들이 요청을 보내는 등 대용량 트래픽이 발생했을 시 DB 병목이 유발될 것이라 생각했습니다.

2개의 스케줄러로 동적으로 구현

그래서 업데이트 서비스가 실행되면 DB에 바로 쓰지 않고, 캐시에 넣어놓고 주기적으로 DB에 반영시키는 방식을 코드 수준에서 구현했었습니다. ScheduledFuture, TaskScheduler를 이용해, 동적으로 스케줄링할 수 있는 커스텀 스케줄러 서비스를 만들었습니다. update 서비스 코드에 Flushing(캐시 내용을 DB에 반영) 스케줄러와, 그 Flushing 스케줄러를 멈추는 스케줄러 를 작동시키는 코드를 삽입했습니다. 캐시에 쌓인 데이터가 Flushing 스케줄러에 의해 주기적으로 DB에 반영하는 작업을 하며, 일정 시간 동안 Dirty한 객체가 발생하지 않으면 두 스케줄러를 모두 삭제하도록 구현했습니다.

핵심 코드

스크린샷 2023-02-15 오후 12 39 04

전체 코드

// ScheduledFuture, TaskScheduler를 이용해 동적으로 스케줄링할 수 있는 커스텀 스케줄러 서비스
@Service
@RequiredArgsConstructor
@Slf4j
public class SchedulerService {

    private final TaskScheduler customThreadPoolTaskScheduler;

    private Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();

    public void register(String taskId, Runnable task, Long period) {
        ScheduledFuture<?> scheduledTask = customThreadPoolTaskScheduler.scheduleAtFixedRate(task, period);
        scheduledTasks.put(taskId, scheduledTask);
    }

    public void register(String taskId, Runnable task, Date startDate, Long period) {
        ScheduledFuture<?> scheduledTask = customThreadPoolTaskScheduler.scheduleAtFixedRate(task, startDate, period);
        scheduledTasks.put(taskId, scheduledTask);
    }

    // initialDelay는 Millisecond 단위
    public void register(String taskId, Runnable task, Long initialDelay, Long period) {

        Date startDate = Timestamp.valueOf(LocalDateTime.now().plusSeconds(TimeUnit.MILLISECONDS.toSeconds(initialDelay)));
        register(taskId, task, startDate, period);
    }

    public void registerLazyExec(String taskId, Runnable task, Long period) {
        register(taskId, task, period, period);
    }

    public void remove(String taskId) {
        if(scheduledTasks.get(taskId) != null) {
            scheduledTasks.get(taskId).cancel(true);
            scheduledTasks.remove(taskId);
            log.info(taskId + "가 종료되었습니다.");
        }
    }

    public boolean existsByTaskId(String taskId){
        return scheduledTasks.containsKey(taskId);
    }


}
// 주기적으로 Dirty한 캐시 내용을 DB에 반영할 flushing 스케줄러
@Component
@Slf4j
public class CigaretteOnListFlushingSchedulerManager {
    final private CigaretteOnListRepository cigaretteOnListRepository;
    final private SchedulerService schedulerService;
    final private SetCacheModule setCacheModule;

    final private UnaryOperator<List<CigaretteOnList>> dbWriteFunction;
    final private Long period;


    public CigaretteOnListFlushingSchedulerManager(CigaretteOnListRepository cigaretteOnListRepository, SchedulerService schedulerService, SetCacheModule setCacheModule) {
        this.cigaretteOnListRepository = cigaretteOnListRepository;
        this.schedulerService = schedulerService;
        this.setCacheModule = setCacheModule;

        this.dbWriteFunction = cigaretteOnListRepository::saveAll;
        this.period = TimeUnit.SECONDS.toMillis(10);
    }

    public void start(Long storeId) {

        String taskId = getTaskId(storeId);

        schedulerService.registerLazyExec(
                taskId,
                task(storeId),
                period);
        log.info(String.format("%s 스케줄러가 시작되었습니다.", taskId));
    }

    private Runnable task(Long storeId){
        return () -> {
            setCacheModule.flushAll(CacheType.CIGARETTE_DIRTY, storeId, dbWriteFunction);
            log.info(String.format("%s task가 수행됐습니다.", getTaskId(storeId)));
        };
    }

    public void startIfNotStarted(Long storeId){
        if(!existsByStoreId(storeId))
            start(storeId);

    }

    public void remove(Long storeId) {
        String taskId = getTaskId(storeId);
        schedulerService.remove(taskId);
    }

    public String getTaskId(Long storeId){
        return "cigarette-on-list-flushing-scheduler-of-storeId-" + storeId;
    }

    public boolean existsByStoreId(Long storeId){
        String taskId = getTaskId(storeId);
        return schedulerService.existsByTaskId(taskId);
    }




}
// 일정시간동안 flushing이 일어나지 않으면 Flushing 스케줄러를 멈출 스케줄러
public class CigaretteOnListStopFlushingSchedulerManager {

    final private CigaretteOnListFlushingSchedulerManager cigaretteOnListFlushingSchedulerManager;
    final private CigaretteOnListRepository cigaretteOnListRepository;
    final private SchedulerService schedulerService;

    final private UnaryOperator<List<CigaretteOnList>> dbWriteFunction;
    final private Long period;


    public CigaretteOnListStopFlushingSchedulerManager(CigaretteOnListFlushingSchedulerManager cigaretteOnListFlushingSchedulerManager, CigaretteOnListRepository cigaretteOnListRepository, SchedulerService schedulerService, CacheModule cacheModule) {
        this.cigaretteOnListFlushingSchedulerManager = cigaretteOnListFlushingSchedulerManager;
        this.cigaretteOnListRepository = cigaretteOnListRepository;
        this.schedulerService = schedulerService;

        this.dbWriteFunction = cigaretteOnListRepository::saveAll;
        this.period = TimeUnit.SECONDS.toMillis(31);
    }

    public void start(Long storeId) {

        String taskId = getTaskId(storeId);
        log.info(String.format("%s 스케줄러가 시작되었습니다.", taskId));
        schedulerService.registerLazyExec(
                taskId,
                () -> remove(storeId),
                period);
    }

    public void startIfNotStarted(Long storeId){
        if(!existsByStoreId(storeId))
            cigaretteOnListFlushingSchedulerManager.startIfNotStarted(storeId);
            start(storeId);
    }

    public void remove(Long storeId) {
        String taskId = getTaskId(storeId);
        schedulerService.remove(taskId);
        cigaretteOnListFlushingSchedulerManager.remove(storeId);
    }

    public String getTaskId(Long storeId){
        return "cigarette-on-list-stop-flushing-scheduler-of-storeId-" + storeId;
    }

    public boolean existsByStoreId(Long storeId){
        String taskId = getTaskId(storeId);
        return schedulerService.existsByTaskId(taskId);
    }

}

결국 철회했습니다

하지만 스케줄링 스레드가 API 서버에 존재하게 되기 때문에 스케일아웃 시 모든 WAS에 스케줄링 스레드가 존재하게 되어 불필요한 오버헤드가 될 것 같고, 로그처럼 덜 중요한 데이터가 아닌 비지니스 데이터가 DB에 빠르게 적용이 안되는 게 문제가 될 것 같아 철회했습니다.

느낀 점

결국 철회하긴 했지만, 어떻게하면 캐시로 인한 부하를 줄일까하는 고민도 의미있었다고 생각하고, Scheduling도 처음 해보았는데 머릿속에 그린 구조를 구현해볼 수 있어서 재미있었습니다.

profile
몰입하고 성장하는 삶을 동경합니다

1개의 댓글

comment-user-thumbnail
2024년 6월 21일

좋은 글 감사합니다.

답글 달기