지난번 7편에서 kafka를 이용해 db의 부하를 줄이는 작업은 수행했으나, 동시성 문제를 해결하지 못해 여러건의 요청이 들어올경우 중복되어 저장되는 문제가 있었다.
Redis는 인메모리 기반 데이터베이스로, 싱글 쓰레드로 작동해 매 작업이 atomic해 동시성 문제를 배제할 수 있다.
여기서 MSA기반 아키텍쳐가 필연적으로 가질 수 있는 동시성 문제의 해결 단서를 찾을 수 있다.
기본적으로 유저 확인 -> 좌석 확인 -> 비어있음? -> 예약해
이 순서로 로직이 흘러가는데, 좌석 확인 -> 비어있음 과정과, 예약해 과정이 동시적으로 발생한다.
분명히 A 쓰레드는 비어있다 해서 db에 저장 요청을 넣었는데, 이게 저장 되기도 전에 B 쓰레드가 조회를 하니 비어있을 수 밖에
Redis는 싱글쓰레드이므로 A가 조회후 저장해~ 하면 B쓰레드는 A가 저장까지 하고 나서 명령어를 수행할 수 있다.
물론 싱글쓰레드이기에 처리 속도는 낮을 수 있지만, 인메모리 기반이기때문에 오히려 디스크에서 정보를 가져오는 db보다 접근속도가 빠르다고 한다.
이것이 atomicity라고 하는데, redis를 이용한 방법 외에도 트랜잭션 격리 수준, 비관적 - 낙관적 락을 이용해서 해결할 수도 있다.
Redis의 GET을 이용한다.
Redis의 기본 연산은 원자적이기 때문에, 모든 요청들은 순차적으로 값에 접근 할 수 있다. 누가 set하는동안 get을 할 수 없다는것이다.
그러면 좌석값을 GET을 이용해서 가져오면 되겠네?
GET을 이용하는것까지는 좋았는데, redis에서 값이 없는걸 확인한 후 db에서 올바른지 체크한다음~ 저장해야지 하고 요청한순간 다른 쓰레드에게 접근 권한이 넘어가 redis에 값을 갱신하기도 전에 GET을 호출하게 되면서 동시성 문제가 발생했다.
GET과 동시에 SET하는 과정까지 점유를 수행해야 하는데, 그렇지 못해서 발생한 문제였다.
GETSET이라는 명령어가 있다.
GET과 동시에 값을 집어넣는 atomic한 연산이므로 조회가 된 맨 처음 쓰레드가 set을 함으로써 다른 쓰레드의 접근을 막을 수 있는 방법이였다.
얘도 이론상으로는 잘 작동하겠지만.. 요청받은 dto가 올바른 요청인지 db에서 검증하는 과정을 거쳐야 하는데, getset을 하게 되면 이 과정을 추가하지못해 사용할 수 없게 되었다.
이걸 사용하기 위해선, db에 저장된 데이터를 캐싱한 뒤, redis 호출전 비교를 하면 될것같기도 했다.
spin-lock이라는 개념이 있다. 동시성 문제를 해결하기 위해 병목지점에 락을 걸어두고, 하나의 쓰레드만 해당 지점에 접근할 수 있게 하며 다른쓰레드는 대기하도록 하는 방식이다.
여기서는 Redisson과 RLock을 이용해 분산 락을 구현했다.
https://velog.io/@tomy8964/MSA-%EC%95%84%ED%82%A4%ED%85%8D%EC%B2%98%EC%97%90%EC%84%9C-Redis%EB%A5%BC-%ED%99%9C%EC%9A%A9%ED%95%9C-%EB%B6%84%EC%82%B0%EB%9D%BD-%EC%A0%81%EC%9A%A9%EC%9D%84-Spring-AOP%EB%A5%BC-%EC%82%AC%EC%9A%A9%ED%95%B4-%EC%9E%AC%EC%82%AC%EC%9A%A9%EC%84%B1-%EB%86%92%EA%B2%8C-%EC%A0%81%EC%9A%A9%ED%95%98%EB%8A%94-%EB%B0%A9%EB%B2%95
를 참조했다.
@Aspect
@Component
class RedisLockAspect(private val client : RedissonClient) {
private val logger = logger()
@Around("@annotation(com.example.event.aop.anno.RedissonLock)")
fun redissonLock(joinPoint: ProceedingJoinPoint) : Any {
val signature : MethodSignature = joinPoint.signature as MethodSignature
val method = signature.method
// annotation info
val annotation = method.getAnnotation(RedissonLock::class.java)
val paramDTO = joinPoint.args[0] as SeatDTO
// lock key
val key = "occupy-${method.name}-${paramDTO.eventId}-${paramDTO.seatId}"
// lock
val lock : RLock = client.getLock(key)
try {
val status = lock.tryLock(annotation.waitTime, annotation.leaseTime, TimeUnit.MILLISECONDS)
if (!status) {
logger.warn("cannot retrieve lock")
throw EventException("lock을 가져올 수 없습니다.", null)
}
logger.info("get lock")
val result : Any = runBlocking {
withContext(Dispatchers.IO) {
joinPoint.proceed(joinPoint.args)
}
}
return result
}
catch (e : InterruptedException) {
logger.error("retreive interrupted exception")
throw e
}
finally {
// 락이 되어 있는경우만 언락
if (lock.isLocked && lock.isHeldByCurrentThread)
lock.unlock()
}
}
}
스프링의 AOP를 이용해 EventService의 특정 메소드에 annotation을 달고, 해당 메소드 전 후로 락을 적용하는 구문을 작성했다.
@RedissonLock(value = "ticket_lock")
suspend fun occupySeat(seatDTO: SeatDTO) : SeatDTO {
val seatCache : String? = redissonClient.getBucket<String>(getKey(seatDTO.eventId, seatDTO.seatId)).get()
// redis cache가 없는경우 조회
if (seatCache != null)
throw EventException("예약할 수 없는 좌석입니다. id : ${seatDTO.seatId}", null)
val seat = findSeatWithEventId(seatDTO.eventId, seatDTO.seatId)
if (seat == null)
throw EventException("존재하지 않는 좌석 정보 입니다. id : ${seatDTO.seatId}", null)
if (seat.status != Status.OPEN)
throw EventException("예약할 수 없는 좌석입니다. id : ${seat.id}", null)
val seatSaveDTO = SeatSaveDTO(seatDTO.eventId, seatDTO.seatId, Status.CLOSE) //Event와 유사한 DTO
redissonClient.getBucket<String>(getKey(seatDTO.eventId, seatDTO.seatId)).set("CLOSED") // redis 주입
kafkaProducer.sendOccupyRequest(seatSaveDTO)
return seatDTO //delete boolean (check랑 통합)
}
이벤트 처리 서비스는 db에서 올바른 좌석값인지 valid를 수행한뒤 kafka로 요청을 보내 저장을 수행한다.
그래서 기본적으로 occupySeat 메소드는 락이 걸려있어, atomic하게 동작하므로 동시성을 보장받을 수 있으며, kafkaProducer로 데이터가 전달되기도 전에 락이 풀리는걸 방지하기 위해 추가적으로 Redis에서 값을 조회하고 비교한다.
스핀락 혹은 분산락은 조금 많이 성능이 좋지 않다. 작동하는 쓰레드를 강제로 멈추고 대기시키는 방식이기 때문에 성능부분에서 리스크가 크다.
https://f-lab.kr/blog/redis-command-for-atomic-operation
해당 글에서도 확인할 수 있는 부분이다.
따라서, 스핀락보다는 최대한 트랜잭션의 격리 레벨을 잘 설정하고, 처리 로직을 변경해 스핀락이 아닌, redis의 값을 단순하게 조회해 즉각적으로 반환할 수 있도록 하는게 좋을 것 같다.
GET은 atomic하니..
val result : Any = runBlocking {
withContext(Dispatchers.IO) {
joinPoint.proceed(joinPoint.args)
}
}
return result
애초에 동기식으로 작성해도 해당 proceed가 다 수행된다음 락이 풀리기 때문에 이렇게 해결했다.