transactional operator 커스텀해서 사용하기

모지리 개발자·2023년 8월 7일
1

spring

목록 보기
7/8

Intro

개발을 하다보면 여러가지 문제를 마주칠 때가 많다. 그리고 내가 겪는 대부분의 문제들은 누군가 이미 겪었을 확률이 99% 이기에 검색만 해봐도 대부분의 문제를 금방 해결할 수 있다. 예를 들어 티켓팅, 좌석예매, 선착순 이벤트 등 굉장히 유명한 문제에 대해서는 다양한 상황에 대해 다양한 해결방법들을 조언해준다.
ex...
만약 비슷한 문제를 마주하게 되고 충분히 적용가능한 상황이라면 누군가가 만들어 놓은 정답을 그대로 사용하는 것이 나쁘다고 생각하지 않는다. (물론 기술적인 고민을 충분히 한 상태에서)

하지만 이번에 내가 겪은 문제, 그리고 내가 원하는 것은 검색을 하기에도 애매했고 뭔가 맘에 드는 방법들이 딱히 없었다. 이 글은 굉장히 애매한 상황에 대해 해결한 방법에 대해 작성한 글이다.

문제

total_count 라는 필드를 가지고있는 테이블이 있다고 쳐보자. default 값은 0이다.

멀티쓰레드 환경에서 여러 트랜잭션이 total_count의 숫자를 동시에 올리려고하면 어떻게 될까?
아마 에러가 발생할 것이다. 필자의 경우에는 aws documentDB를 쓰고있었고 여러쓰레드를 띄운 후 테스트해본 결과 write conflict가 발생했다.
참고로 total_count에 숫자를 증가시키는 방법은 아래와 같이 쿼리를 사용했다.

    override suspend fun increase(id: String, n: Int): UpdateResult? {
        return reactiveMongoTemplate.update(xx::class.java)
            .matching(Query.query(Criteria.where("id").`is`(id)))
            .apply(Update().inc("total_count", n))
            .first().awaitSingle()
    }

보통 이런경우에는 어떻게 개발할까?
방법은 무궁무진하게 많겠지만 가장 첫번째로 생각나는 방법은 redis lock을 활용하여 충돌을 미연에 방지하는 것이다.
또는 fifo 큐를 활용하여 요청을 처리할수도있겠고 redis sortedset을 써서 pubsub 기능을 통해 구현할 수도 있을 것 같다.

근데 내 상황은 살짝 애매했다.
우선 멀티쓰레드 환경이고 write conflict가 발생할수도 있는건 맞지만 기획특성상 여러 요청이 동시에 들어올 확률이 굉장히 적었다. (실제 배포 이후에도 동시 요청의 상황이 거의 없었음)
그래서 redis lock을 쓰거나 큐를 사용하는 건 오버스펙 같았다. 심지어 크리티컬한 데이터도 아니였고 부분 유실도 허용되었다.

해결방법

이 문제는
import org.springframework.transaction.reactive.TransactionalOperator와 kotlin의 확장함수 기능을 활용하여 해결하였다.


suspend fun <T : Any> TransactionalOperator.retryWhenAndAwait(
    cause: KClass<out Throwable>? = NonTransientDataAccessException::class,
    maxAttempts: Int = 5,
    delay: Long = 500,
    f: suspend (ReactiveTransaction) -> T?
): T? {
    repeat(maxAttempts - 1) { repeatCount ->
        try {
            return executeAndAwait(f)
        } catch (ex: Throwable) {
            if (cause!!.java.isAssignableFrom(ex.javaClass)) {
                log.info { "try to recover (maxAttempts : $maxAttempts, repeatCount : $repeatCount, delay : $delay)" }
            } else {
                throw ex
            }
        }
        delay(delay)
    }
    return executeAndAwait(f)
}

함수의 간단한 설명은 이렇다.
cause는 발생가능한 에러 클래스를 명시한다.
maxAttempts는 최대 시도 횟수이다.
delay는 시도 횟수간의 delay 간격 시간이다.

사용방법

    suspend fun testTransactionalOperator() = transactionalOperator.retryWhenAndAwait { 
        // 로직 실행
    }

만약에 인자를 추가하고 싶다면 아래와 같이 하면된다.

    suspend fun testTransactionalOperator() = transactionalOperator.retryWhenAndAwait(cause = MongoTransactionException::class, maxAttempts = 10, delay = 700) { 
        // 로직 실행
    }

이 방법을 통해 트랜잭션간의 충돌 혹은 원하는 에러가 발생하면 maxAttempt만큼 delay의 시간 간격대로 재시도를 하게된다

profile
항상 부족하다 생각하며 발전하겠습니다.

1개의 댓글

comment-user-thumbnail
2023년 8월 9일

redis lock, fifo 큐, redis sortedset 이 방법들도 다뤄주세요~

답글 달기