이 앱으로 한 번 담배 검수를 시작하면 1-2초에 1번씩 업데이트 요청을 보내게 됩니다. 그 때마다 DB에 반영한다면 응답 시간도 길어지고, 많은 사람들이 요청을 보내는 등 대용량 트래픽이 발생했을 시 DB 병목이 유발될 것이라 생각했습니다.
그래서 업데이트 서비스가 실행되면 DB에 바로 쓰지 않고, 캐시에 넣어놓고 주기적으로 DB에 반영시키는 방식을 코드 수준에서 구현했었습니다. ScheduledFuture, TaskScheduler를 이용해, 동적으로 스케줄링할 수 있는 커스텀 스케줄러 서비스를 만들었습니다. update 서비스 코드에 Flushing(캐시 내용을 DB에 반영) 스케줄러와, 그 Flushing 스케줄러를 멈추는 스케줄러 를 작동시키는 코드를 삽입했습니다. 캐시에 쌓인 데이터가 Flushing 스케줄러에 의해 주기적으로 DB에 반영하는 작업을 하며, 일정 시간 동안 Dirty한 객체가 발생하지 않으면 두 스케줄러를 모두 삭제하도록 구현했습니다.
// ScheduledFuture, TaskScheduler를 이용해 동적으로 스케줄링할 수 있는 커스텀 스케줄러 서비스
@Service
@RequiredArgsConstructor
@Slf4j
public class SchedulerService {
private final TaskScheduler customThreadPoolTaskScheduler;
private Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
public void register(String taskId, Runnable task, Long period) {
ScheduledFuture<?> scheduledTask = customThreadPoolTaskScheduler.scheduleAtFixedRate(task, period);
scheduledTasks.put(taskId, scheduledTask);
}
public void register(String taskId, Runnable task, Date startDate, Long period) {
ScheduledFuture<?> scheduledTask = customThreadPoolTaskScheduler.scheduleAtFixedRate(task, startDate, period);
scheduledTasks.put(taskId, scheduledTask);
}
// initialDelay는 Millisecond 단위
public void register(String taskId, Runnable task, Long initialDelay, Long period) {
Date startDate = Timestamp.valueOf(LocalDateTime.now().plusSeconds(TimeUnit.MILLISECONDS.toSeconds(initialDelay)));
register(taskId, task, startDate, period);
}
public void registerLazyExec(String taskId, Runnable task, Long period) {
register(taskId, task, period, period);
}
public void remove(String taskId) {
if(scheduledTasks.get(taskId) != null) {
scheduledTasks.get(taskId).cancel(true);
scheduledTasks.remove(taskId);
log.info(taskId + "가 종료되었습니다.");
}
}
public boolean existsByTaskId(String taskId){
return scheduledTasks.containsKey(taskId);
}
}
// 주기적으로 Dirty한 캐시 내용을 DB에 반영할 flushing 스케줄러
@Component
@Slf4j
public class CigaretteOnListFlushingSchedulerManager {
final private CigaretteOnListRepository cigaretteOnListRepository;
final private SchedulerService schedulerService;
final private SetCacheModule setCacheModule;
final private UnaryOperator<List<CigaretteOnList>> dbWriteFunction;
final private Long period;
public CigaretteOnListFlushingSchedulerManager(CigaretteOnListRepository cigaretteOnListRepository, SchedulerService schedulerService, SetCacheModule setCacheModule) {
this.cigaretteOnListRepository = cigaretteOnListRepository;
this.schedulerService = schedulerService;
this.setCacheModule = setCacheModule;
this.dbWriteFunction = cigaretteOnListRepository::saveAll;
this.period = TimeUnit.SECONDS.toMillis(10);
}
public void start(Long storeId) {
String taskId = getTaskId(storeId);
schedulerService.registerLazyExec(
taskId,
task(storeId),
period);
log.info(String.format("%s 스케줄러가 시작되었습니다.", taskId));
}
private Runnable task(Long storeId){
return () -> {
setCacheModule.flushAll(CacheType.CIGARETTE_DIRTY, storeId, dbWriteFunction);
log.info(String.format("%s task가 수행됐습니다.", getTaskId(storeId)));
};
}
public void startIfNotStarted(Long storeId){
if(!existsByStoreId(storeId))
start(storeId);
}
public void remove(Long storeId) {
String taskId = getTaskId(storeId);
schedulerService.remove(taskId);
}
public String getTaskId(Long storeId){
return "cigarette-on-list-flushing-scheduler-of-storeId-" + storeId;
}
public boolean existsByStoreId(Long storeId){
String taskId = getTaskId(storeId);
return schedulerService.existsByTaskId(taskId);
}
}
// 일정시간동안 flushing이 일어나지 않으면 Flushing 스케줄러를 멈출 스케줄러
public class CigaretteOnListStopFlushingSchedulerManager {
final private CigaretteOnListFlushingSchedulerManager cigaretteOnListFlushingSchedulerManager;
final private CigaretteOnListRepository cigaretteOnListRepository;
final private SchedulerService schedulerService;
final private UnaryOperator<List<CigaretteOnList>> dbWriteFunction;
final private Long period;
public CigaretteOnListStopFlushingSchedulerManager(CigaretteOnListFlushingSchedulerManager cigaretteOnListFlushingSchedulerManager, CigaretteOnListRepository cigaretteOnListRepository, SchedulerService schedulerService, CacheModule cacheModule) {
this.cigaretteOnListFlushingSchedulerManager = cigaretteOnListFlushingSchedulerManager;
this.cigaretteOnListRepository = cigaretteOnListRepository;
this.schedulerService = schedulerService;
this.dbWriteFunction = cigaretteOnListRepository::saveAll;
this.period = TimeUnit.SECONDS.toMillis(31);
}
public void start(Long storeId) {
String taskId = getTaskId(storeId);
log.info(String.format("%s 스케줄러가 시작되었습니다.", taskId));
schedulerService.registerLazyExec(
taskId,
() -> remove(storeId),
period);
}
public void startIfNotStarted(Long storeId){
if(!existsByStoreId(storeId))
cigaretteOnListFlushingSchedulerManager.startIfNotStarted(storeId);
start(storeId);
}
public void remove(Long storeId) {
String taskId = getTaskId(storeId);
schedulerService.remove(taskId);
cigaretteOnListFlushingSchedulerManager.remove(storeId);
}
public String getTaskId(Long storeId){
return "cigarette-on-list-stop-flushing-scheduler-of-storeId-" + storeId;
}
public boolean existsByStoreId(Long storeId){
String taskId = getTaskId(storeId);
return schedulerService.existsByTaskId(taskId);
}
}
하지만 스케줄링 스레드가 API 서버에 존재하게 되기 때문에 스케일아웃 시 모든 WAS에 스케줄링 스레드가 존재하게 되어 불필요한 오버헤드가 될 것 같고, 로그처럼 덜 중요한 데이터가 아닌 비지니스 데이터가 DB에 빠르게 적용이 안되는 게 문제가 될 것 같아 철회했습니다.
결국 철회하긴 했지만, 어떻게하면 캐시로 인한 부하를 줄일까하는 고민도 의미있었다고 생각하고, Scheduling도 처음 해보았는데 머릿속에 그린 구조를 구현해볼 수 있어서 재미있었습니다.
좋은 글 감사합니다.