Redis만으로 서버 대기열을 구현하는 여러 방법들

rvlwldev·2024년 9월 21일

티켓팅, 한정판 및 선착순 구매 등 갯수에 대한 데이터 정합성이 중요할 때가 있다.
하지만 많은 유저가 몰리는 순간 모든 요청을 받으면 DB, 서버가 버티지 못할 위험이 있다.
많은 개발자들이 MQ를 생각하지만 작업 시간과 카프카와 같은 인프라가 없을 수도 있다.
또한 MQ를 도입한다는 것이 실무에서는 꽤나 부담스러운 경우가 많다.
때문에 Spring 기반의 서버와 레디스(Lettuce)만으로 대기열을 구현하는 4가지 정도의 방법이 있다.

  1. 스케줄러
    • 서버에서 주기적으로 대기열에서 활성열로 옮겨준다.
  2. Pub/Sub
    • 서버에서 이벤트를 직접 발행한다.
    • 별도의 이벤트 리스너가 대기열에서 활성열로 옮겨준다.
  3. Keyspace Notification
    • 레디스에서 활성열 데이터가 삭제되면 이벤트가 발행된다.
    • 별도의 이벤트 리스너가 대기열에서 활성열로 옮겨준다.
  4. Stream
    • 아래의 대기열 진입 구조를 따르지 않으며 MQ방식과 비슷하다.

대기열 진입 구조

대기열을 위해 Sorted Set을 사용한다.
Score에 진입 시각을 넣으면 자연스럽게 FIFO 구조가 된다.
ZRANK 명령어로 순번을 아주 빠르게 조회할 수 있다.

이후 활성열 관리 방법에 따라 위 1, 2, 3번의 방식으로 나뉘어진다.

DTO

sealed class QueueResult {
    data class Waiting(
        val uid: String,
        val rank: Long,
        val token: String
    ) : QueueResult()

    data class Active(
        val uid: String,
        val token: String
    ) : QueueResult()

    object NotFound : QueueResult()
}

대기열 진입 로직

상품별로 대기열을 분리한다.
그래야 인기 없는 상품의 토큰으로 인기 상품 결제를 시도하는 편법을 막을 수 있다.
이 예시에서는 유저별로 JWT를 발급하고 해당 토큰으로 유저가 유저의 상태를 조회할 수 있다.

// 대기열 Sorted Set Key/Member
private const val WAIT_KEY = "wait:id:%d"
private const val WAIT_MEMBER = "user:%s:token:%s"

// 활성열 Sorted Set Key
private const val ACTIVE_KEY = "active:id:%d"

// 대기열이 존재하는 상품 ID 목록
private const val ACTIVE_PRODUCTS_KEY = "queue:products" 

// 최대 활성열 갯수
private const val ACTIVE_MAX = 10

// 활성 상태 유효 시간 (10분)
private const val ACCESS_TTL_MILLIS = 10 * 60 * 1000L

// redis -> org.springframework.data.redis.core.StringRedisTemplate
// jwt -> JWT 커스텀 컴포넌트

fun createWaitToken(pid: Long, uid: String): QueueResult.Waiting {
    val token = jwt.create("wait", mapOf("pid" to pid, "uid" to uid))
    val now = System.currentTimeMillis().toDouble()
    val key = WAIT_KEY.format(pid)
    val member = WAIT_MEMBER.format(uid, token)

    redis.opsForSet().add(ACTIVE_PRODUCTS_KEY, pid.toString())
    redis.opsForZSet().add(key, member, now)

    val rank = redis.opsForZSet().rank(key, member)?.plus(1) ?: 1L
    return QueueResult.Waiting(uid, rank, token)
}

저장 구조:

  • 키: wait:id:{pid}
  • 멤버: user:{uid}:token:{token}
  • 스코어: 진입 시각 (밀리초 타임스탬프)

순번 조회

fun getWaitQueueRank(pid: Long, uid: String, token: String): Long? {
    val key = WAIT_KEY.format(pid)
    val member = WAIT_MEMBER.format(uid, token)
    return redis.opsForZSet().rank(key, member)?.plus(1) // 0-based → 1-based
}

스케줄러

가장 간단하고 빠르게 구현할 수 있다.
일시적으로 빠르게 대기열이 필요하다면 가장 많이 쓰이는 방법이지 않을까

스케줄러가 주기적으로 활성열을 확인한다.
여유 갯수만큼 대기열에서 꺼내 이동시킨다.
활성열도 Sorted Set으로 관리하며, Score에 만료 타임스탬프를 저장한다.

활성열 이동

@Scheduled(fixedDelay = 5000)
fun promote() {
    val setops = redis.opsForSet()
    val zsetops = redis.opsForZSet()
    val ids = setops.members(ACTIVE_PRODUCTS_KEY) ?: return
    
    ids.forEach { pid -> 
    	val waitKey = WAIT_KEY.format(pid)
        val activeKey = ACTIVE_KEY.format(pid)
        val now = System.currentTimeMillis()
        
        // 활성열의 만료된 멤버 제거
        zsetops.removeRangeByScore(activeKey, 0.0, now.toDouble())
        
        // 활성열의 여유 갯수 계산
        val size = zsetops.size(activeKey) ?: 0L
        val spares = (ACTIVE_MAX - size).coerceAtLeast(0)
        if (spares == 0L) return@forEach
        
        // 대기열의 상위 멤버 추출
        val candidates = zsetops.range(waitKey, 0, spares - 1L)
        if (candidates.isNullOrEmpty()) {
            set.remove(ACTIVE_PRODUCTS_KEY, pid.toString())
            return@forEach
        }

		// 활성열로 이동
        val expireAt = (now + ACCESS_TTL_MILLIS).toDouble()
        candidates.forEach { member ->
            zset.add(activeKey, member, expireAt)
            zset.remove(waitKey, member)
    }

매 주기마다 ZREMRANGEBYSCORE로 만료된 멤버를 일괄 제거한다.
이후 빈 슬롯만큼 대기열 상위 멤버를 활성열로 옮긴다.
유저가 다음 과정(ex. 결제)없이 이탈해도 Score(만료시각)가 지나면 다음 주기에 자동으로 정리된다.

Lua 스크립트

위 코드는 읽기(ZCARD)와 쓰기(ZADD)가 분리되어 있다.
멀티 인스턴스 환경에서 두 인스턴스가 동시에 실행하면:

Instance A: ZCARD → 8  (2개 여유)
Instance B: ZCARD → 8  (2개 여유)
Instance A: ZADD 멤버1, 멤버2
Instance B: ZADD 멤버3, 멤버4  → ACTIVE_MAX 초과

분산락으로 막을 수 있지만, 락 획득 실패 시 주기를 통째로 건너뛰게 된다.
락을 가지고 서버가 다운되면 TTL 만료까지 활성열 이동 로직이 멈춘다.
근본적인 해결은 읽기-판단-쓰기를 하나의 원자적 연산으로 묶는 것이다.

Redis는 싱글 스레드로 명령을 처리하고, Lua 스크립트는 중단 없이 실행된다.

-- promote.lua
-- KEYS[1] = activeKey
-- KEYS[2] = waitKey

-- ARGV[1] = now
-- ARGV[2] = expireAt
-- ARGV[3] = ACTIVE_MAX

redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1])

local size = redis.call('ZCARD', KEYS[1])
local spares = math.max(tonumber(ARGV[3]) - size, 0)
if spares == 0 then return 0 end

local candidates = redis.call('ZRANGE', KEYS[2], 0, spares - 1)
if #candidates == 0 then return -1 end

local zaddArgs = {}
local zremArgs = {}
for _, member in ipairs(candidates) do
    table.insert(zaddArgs, ARGV[2])
    table.insert(zaddArgs, member)
    table.insert(zremArgs, member)
end

redis.call('ZADD', KEYS[1], unpack(zaddArgs))
redis.call('ZREM', KEYS[2], unpack(zremArgs))

return #candidates
private val script = RedisScript.of(ClassPathResource("promote.lua"), Long::class.java)

@Scheduled(fixedDelay = 5000)
fun promote() {
	val ids = redis.opsForSet().members(ACTIVE_PRODUCTS_KEY) ?: return
    ids.forEach { pid ->
    	val keys = listOf(ACTIVE_KEY.format(pid), WAIT_KEY.format(pid))
        val now = System.currentTimeMillis()
        val expireAt = now + ACCESS_TTL_MILLIS

        val result = redis.execute(script, keys,
            now.toString(), 
            expireAt.toString(), 
            ACTIVE_MAX.toString()
		)

        if (result == -1L)
            redis.opsForSet().remove(ACTIVE_PRODUCTS_KEY, pid)
    }
}

이후 결제 등 다음 단계 완료 시 pid, uid와 token으로 활성열의 데이터를 지운다.

하지만 활성열의 여유가 있어도 다음 주기까지 N초를 기다리게 된다.
많은 사람들이 이 지연을 굉장히 싫어한다.
때문이 여유가 있다면 아래 방법을 생각해볼 수 있다.


Pub/Sub & Keyspace Notification

두 방식 모두 스케줄러처럼 주기를 기다리지 않는 이벤트 기반이다.
활성열에 여유가 생기면 즉시 트리거된다.
이 둘의 차이는 누가 이벤트를 발행하는가, 발행 주체의 차이이다.

  • Pub/Sub: 서버가 결제 완료 시점에 직접 PUBLISH 한다.
  • Keyspace Notification: 키 만료, 삭제 시점에 레디스에서 자동으로 발행된다.

Keyspace Notification 또한 기본적으로 Pub/Sub 으로 동작한다.
때문에 발행 시점에 구독자가 없다면 이벤트는 유실된다.
단독으로 쓰면 위험할 수 있으니 보완용 스케줄러도 필요하다.

기본 활성열 이동 로직

활성열 하나가 만료될 때마다 1명씩 대기열에서 이동한다.
카운터 확인 → 대기열 추출 → 슬롯 생성 Lua로 원자적 처리한다.

멀티 인스턴스에서 모든 구독자가 메시지를 수신한다.
하지만 Lua의 원자성으로 하나의 인스턴스만 promote가 성공한다.
나머지 인스턴스들은 활성열에 여유 슬롯이 없어 return 한다.

-- promote-single.lua

-- KEYS[1] = waitKey
-- KEYS[2] = activeCountKey

-- ARGV[1] = slotKeyPrefix
-- ARGV[2] = ACTIVE_MAX
-- ARGV[3] = TTL(초)

local count = tonumber(redis.call('GET', KEYS[2]) or '0')
if count >= tonumber(ARGV[2]) then return 0 end

local top = redis.call('ZRANGE', KEYS[1], 0, 0)
if #top == 0 then return -1 end

local member = top[1]
local uid = string.match(member, 'user:(.-):token:')

redis.call('SETEX', ARGV[1] .. uid, ARGV[3], member)
redis.call('ZREM', KEYS[1], member)
redis.call('INCR', KEYS[2])
return 1
// ... 기존 상수들
private const val ACTIVE_COUNT_KEY = "active:count:%d"
private const val ACCESS_TTL_SECONDS = ACCESS_TTL_MILLIS / 1000L

private val script = RedisScript.of(ClassPathResource("promote-single.lua"), Long::class.java)

fun promote(pid: Long) {
	val keys = listOf(WAIT_KEY.format(pid), ACTIVE_COUNT_KEY.format(pid))
    val result = redis.execute(script, keys,
    	"active:slot:$pid:", 
        ACTIVE_MAX.toString(), 
        ACCESS_TTL_SECONDS.toString()
	)

    if (result == -1L)
        redis.opsForSet().remove(ACTIVE_PRODUCTS_KEY, pid.toString())
}

Pub/Sub

활성열의 구조는 스케줄러 방식과 동일하다. (Sorted SetScore에 만료시각 저장)
달라지는 것은 활성열 이동 로직의 트리거뿐이다.

스케줄러로 실행시키는 것 대신, 유저가 결제한 순간 해당 멤버를 삭제하고 바로 이벤트를 발행한다.

컴포넌트

@Component
class SlotAvailableListener(
    private val queueManager: WaitingQueueManager, // 대기열 관리 컴포넌트
) : MessageListener {
    override fun onMessage(message: Message, pattern: ByteArray?) {
        val pid = String(message.body).toLongOrNull() ?: return
        queueManager.promote(pid) // 대기열 -> 활성열 이동
    }
}

설정

@Configuration
class PubSubConfig(
    private val connectionFactory: RedisConnectionFactory,
    private val listener: SlotAvailableListener,
) {
    @Bean
    fun pubSubListenerContainer(): RedisMessageListenerContainer {
        val container = RedisMessageListenerContainer()

        container.setConnectionFactory(connectionFactory)
        container.addMessageListener(listener, PatternTopic("queue:slot-available:*"))

        return container
    }
}

활성열 이동

활성열이 스케줄러와 동일한 Sorted Set이므로, 스케줄러에서 사용한 promote.lua를 그대로 사용한다.
차이점은 전체 상품을 순회하지 않고, 이벤트가 발생한 해당 상품에 대해서만 실행한다.

private val script = RedisScript.of(ClassPathResource("promote.lua"), Long::class.java)

fun promote(pid: Long) {
    val keys = listOf(ACTIVE_KEY.format(pid), WAIT_KEY.format(pid))
    val now = System.currentTimeMillis()
    val expireAt = now + ACCESS_TTL_MILLIS

    val result = redis.execute(script, keys,
        now.toString(), expireAt.toString(), ACTIVE_MAX.toString()
    )

    if (result == -1L)
        redis.opsForSet().remove(ACTIVE_PRODUCTS_KEY, pid.toString())
}

멀티 인스턴스에서 모든 구독자가 메시지를 수신한다.
하지만 promote.lua가 원자적으로 실행되므로 첫 번째 인스턴스만 promote에 성공하고,
나머지는 여유 슬롯이 없어 0을 반환하며 즉시 종료된다.
(if spares == 0 then return 0 end 부분)

보완용 스케줄러

명시적인 결제 완료만 감지할 뿐, 결제 없이 이탈할 경우 활성열에 남는다.
때문에 스케줄러가 필요하다.

@Scheduled(fixedDelay = 30_000) // 30초
fun promoteAll() {
    val ids = redis.opsForSet().members(ACTIVE_PRODUCTS_KEY) ?: return
    ids.forEach { pid -> promote(pid.toLong()) }
}

promote.lua가 먼저 ZREMRANGEBYSCORE로 만료된 멤버를 정리하므로,
promote(pid)를 호출하는 것만으로 정리와 충원이 동시에 이루어진다.
Pub/Sub이 정상 동작하는 동안에는 여유 슬롯이 없어 Lua에서 즉시 종료되므로 부하가 거의 없다.

Keyspace Notification

활성 슬롯이 만료되는 순간 레디스가 자동으로 이벤트를 발행한다.
이를 구독해서 promote를 트리거하는 방식이다.

Pub/Sub과 달리 서버가 명시적으로 이벤트를 발행할 필요가 없다.
유저가 결제를 완료하든, 결제 없이 이탈하든, 슬롯이 만료되면 자동으로 다음 유저가 진입한다.

활성열 구조 변경

Sorted SetScore 기반 만료는 키 만료 이벤트를 발생시키지 않는다.
Keyspace Notification을 사용하려면 활성열 구조를 실제 TTL이 있는 개별 키로 변경해야 한다.

  • 스케줄러 & Pub/Sub:
    - active:id:{pid} (Sorted Set, score = 만료시각)
  • Keyspace Notification:
    - active:slot:{pid}:{uid} (개별 키, TTL = 10분)
    - active:count:{pid} (활성 인원 카운터)

Sorted Set에서는 ZCARD로 O(1)에 활성 인원을 조회할 수 있었다.
하지만 개별 키 방식에서는 키를 일일이 셀 수 없으므로 별도 카운터 active:count:{pid}를 사용한다.

// 기존 상수에 추가
private const val ACTIVE_COUNT_KEY = "active:count:%d"
private const val ACCESS_TTL_SECONDS = ACCESS_TTL_MILLIS / 1000L

설정

Keyspace Notification은 기본적으로 비활성화되어 있다.
때문에 추가적인 설정이 필요하다.

# application.yml
spring:
  data:
    redis:
      notify-keyspace-events: Ex   # E = Keyevent, x = expired
@Configuration
class KeyspaceNotificationConfig(
    private val connectionFactory: RedisConnectionFactory,
    private val listener: ActiveSlotExpiredListener,
) {
    @Bean
    fun redisMessageListenerContainer(): RedisMessageListenerContainer {
        val container = RedisMessageListenerContainer()
        
        container.setConnectionFactory(connectionFactory)
        container.addMessageListener(listener, PatternTopic("__keyevent@0__:expired"))

        return container
    }
}

리스너

@Component
class ActiveSlotExpiredListener(
    private val redis: StringRedisTemplate,
    private val queueManager: WaitingQueueManager,
) : MessageListener {

    override fun onMessage(message: Message, pattern: ByteArray?) {
        val key = String(message.body) // "active:slot:42:userA"
        if (!key.startsWith("active:slot:")) return

        val pid = key.removePrefix("active:slot:")
            .substringBefore(":").toLongOrNull() ?: return

        redis.opsForValue().decrement(ACTIVE_COUNT_KEY.format(pid))
        queueManager.promote(pid)
    }
}

활성열 슬롯 하나가 만료될 때마다 이벤트가 발생한다.
카운터를 1 감소시키고 대기열에서 1명을 promote한다.

활성열 이동

카운터 확인 → 대기열 추출 → 슬롯 생성을 Lua로 원자적 처리한다.

-- promote-single.lua
-- KEYS[1] = waitKey
-- KEYS[2] = activeCountKey

-- ARGV[1] = slotKeyPrefix
-- ARGV[2] = ACTIVE_MAX
-- ARGV[3] = TTL(초)

local count = tonumber(redis.call('GET', KEYS[2]) or '0')
if count >= tonumber(ARGV[2]) then return 0 end

local top = redis.call('ZRANGE', KEYS[1], 0, 0)
if #top == 0 then return -1 end

local member = top[1]
local uid = string.match(member, 'user:(.-):token:')

redis.call('SETEX', ARGV[1] .. uid, ARGV[3], member)
redis.call('ZREM', KEYS[1], member)
redis.call('INCR', KEYS[2])
return 1

Pub/Sub의 promote.lua와는 구조가 완전히 다르다.
Pub/Sub은 활성열이 Sorted Set이라 ZREMRANGEBYSCORE로 만료 멤버를 정리하고 여러 명을 한 번에 이동시킨다.
여기서는 활성열이 개별 키(TTL)이므로 만료 정리가 필요 없고, 1명씩 이동시킨다.

private val script = RedisScript.of(
    ClassPathResource("promote-single.lua"), Long::class.java
)

fun promote(pid: Long): Long {
    val keys = listOf(WAIT_KEY.format(pid), ACTIVE_COUNT_KEY.format(pid))
    val result = redis.execute(script, keys,
        "active:slot:$pid:",
        ACTIVE_MAX.toString(),
        ACCESS_TTL_SECONDS.toString()
    ) ?: 0L

    if (result == -1L)
        redis.opsForSet().remove(ACTIVE_PRODUCTS_KEY, pid.toString())

    return result
}

멀티 인스턴스에서 모든 구독자가 만료 이벤트를 수신한다.
하지만 Lua의 원자성 덕분에 하나의 인스턴스만 promote에 성공하고,
나머지는 카운터가 이미 ACTIVE_MAX에 도달해 0을 반환한다.

멀티 인스턴스 환경 주의사항

키 하나가 만료되면 모든 인스턴스가 동일한 이벤트를 수신한다.
리스너에서 각자 DECR을 실행하면 카운터가 N만큼 감소하지만,
이후 promote-single.luaINCR은 Lua 원자성으로 1개 인스턴스만 실행된다.

예시 (3개 인스턴스, 슬롯 1개 만료):

Before: counter = 10, 실제 활성 슬롯 = 10

3개 인스턴스 각각 DECR → counter = 7
인스턴스 A: Lua 성공, INCR → counter = 8, 슬롯 1개 생성
인스턴스 B: Lua 성공, INCR → counter = 9, 슬롯 1개 생성
인스턴스 C: Lua 성공, INCR → counter = 10, 슬롯 1개 생성

After: counter = 10, 실제 활성 슬롯 = 12 → ACTIVE_MAX 초과

완전한 정합성이 필요하다면 DECR과 promote를 하나의 Lua 스크립트로 합치고,
만료된 키를 식별하는 NX 락으로 중복 처리를 방지해야 한다.
단일 인스턴스 환경에서는 이 문제가 발생하지 않는다.

profile
ㅇ0ㅇ

0개의 댓글