RedissonLock의 tryLock() 메서드 내부 구현 살펴보기

이상훈·2025년 4월 29일

Project

목록 보기
11/17

Redisson은 Java 언어로 구현된 Redis 기반 분산락 클라이언트이다. Redisson은 RLock 인터페이스를 통해 Redis 기반의 분산락을 제공한다. 대표적인 구현체로는 다음과 같다.

  • RedissonLock: 단일 키 기반의 기본 분산락 구현체

  • RedissonMultiLock: 여러 개의 락을 동시에 걸어야 할 때 사용하는 멀티락 구현체

  • RedissonRedLock: Redis RedLock 알고리즘을 기반으로 한 구현체 (여러 Redis 인스턴스를 활용)

이 중 RedissonLock은 가장 기본이 되는 구현체로 내부적으로 Lua 스크립트, Pub/Sub을 활용한다.


RedissonLock

분산락을 구현하는 로직

public void method() {
    RLock lock = redissonClient.getLock(key); // 1

    try {
        boolean available = lock.tryLock(waitTime, leaseTime, timeUnit); // 2
      
        if (available) { // 3
        // business logic
        }
    }
    
    catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    
    finally {
        lock.unlock(); // 4
    }
}
  1. key로 RLock 객체를 가져온다.
  2. tryLock 메서드를 호출해 락 획득을 시도한다.
    • waitTime : 락 획득을 기다리는 시간
    • leaseTime : 락의 대여 시간
    • unit : 시간 단위
  3. 락 획득에 성공하면 이후 비즈니스 로직을 진행한다.
  4. 락을 해제한다.

tryLock()

1. 락 획득 시도

우선 tryAcquire()를 통해 락 획득을 시도한다. 락이 존재하지 않거나 이미 자신이 소유한 경우에는 락을 획득하고 null을 반환하며, 이는 락 획득 성공을 의미한다. 그렇지 않다면 TTL(남은 점유 시간)을 반환하고 다음 단계로 진행한다.

Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
    return true;
}
  • 🔍 tryAcquire 내부
    tryAcquire() 메서드는 내부적으로 Redis에 락을 획득하기 위해 Lua script를 사용한다. 만약 Lua script 없이 일반 redis 명령어 조합으로 구현한다면, 락 존재 여부 확인 -> 락 획득 시도 사이에 다른 클라이언트가 개입할 수 있어 결국 여러 클라이언트가 동시에 락을 획득해버리는 정합성 문제가 발생할 수 있다. 이를 방지하기 위해 redisson은 락 존재 여부 확인 -> 락 획득 시도를 하나의 lua script로 묶어 원자적으로 실행한다. 참고로 락 해제 시에도 동일한 이유로 Lua Script를 사용한다.

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command
    ) {
       return this.commandExecutor.syncedEval(
           this.getRawName(),
           LongCodec.INSTANCE,
           command,
           // Lua script
           "if ((redis.call('exists', KEYS[1]) == 0) or " +
           "    (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
           "    redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
           "    redis.call('pexpire', KEYS[1], ARGV[1]); " +
           "    return nil; " +
           "end; " +
           "return redis.call('pttl', KEYS[1]);",
           Collections.singletonList(this.getRawName()),
           new Object[]{
               unit.toMillis(leaseTime),         // ARGV[1]: 락 TTL
               this.getLockName(threadId)       // ARGV[2]: 락 소유자 (UUID + Thread ID)
           }
       );
    }

2. waitTime이 초과되었는지 확인

TTL이 남아있는 상황에서 락을 얻지 못했을 경우, 현재까지 대기한 시간을 반영하여 waitTime이 초과되었는지 확인한다. 초과되었다면 실패 처리 후 false를 반환한다.

time -= System.currentTimeMillis() - current;
if (time <= 0L) {
    this.acquireFailed(waitTime, unit, threadId);
    return false;
}

3. pub/sub 채널 구독 대기

다른 스레드가 락을 해제하면 Redis pub/sub 채널을 통해 해당 이벤트를 받을 수 있도록 구독을 수행한다. 만약 채널을 구독하여 대기하다가 waitTime이 지나면 TimeOutException이 발생한다.

CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);

try {
    subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException var21) {
    if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
        subscribeFuture.whenComplete((res, ex) -> {
            if (ex == null) {
                this.unsubscribe(res, threadId);
            }

        });
    }

    this.acquireFailed(waitTime, unit, threadId);
    return false;
}
  • 🔍 subscribe() 내부
    subscribe() 메서드 내부에는 세마포어가 존재한다. Redisson은 세마포어를 활용하여 subscribe() 로직에 진입할 수 있는 동시 스레드 수를 제한한다. 세마포어를 획득한 스레드만이 Redis에 subscribe 요청을 보내고, 락 해지 메시지를 수신할 수 있는 상태가 된다. 세마포어를 획득하지 못한 스레드들은 대기하며, Redis에 불필요한 subscribe 요청이 몰리는 것을 방지하여 Redis 부하를 줄일 수 있다.
public CompletableFuture<E> subscribe(String entryName, String channelName) {
        AsyncSemaphore semaphore = this.service.getSemaphore(new ChannelName(channelName));
        CompletableFuture<E> newPromise = new CompletableFuture();
        semaphore.acquire().thenAccept((c) -> {
            if (newPromise.isDone()) {
                semaphore.release();
			...
       

4. 락 재시도 루프 (TTL 기반 polling + pub/sub 이벤트 대기)

세마포어를 획득한 스레드만 subscribe()를 완료한 뒤, 락 재시도 루프에 진입한다.

  1. tryAcquire()를 호출하여 락 획득을 시도한다.
    1.1 락을 획득하면 즉시 true를 반환하고 종료한다.
    1.2 락을 획득하지 못하면, 해당 락의 TTL(Time To Live)을 반환받는다.
  2. 반환된 TTL만큼 대기하며, 락 해제 이벤트(Pub/Sub)를 기다린다.
  3. TTL이 만료되거나 락 해제 이벤트를 수신하면 다시 1번으로 돌아간다.

이 과정을 남은 waitTime 내에서 반복하며, waitTime이 초과되면 락 획득을 포기하고 false를 반환한다. 참고로 TTL 만료 시 직접 tryAcquire()를 다시 호출하는 이유는, Pub/Sub 메시지가 반드시 수신된다는 보장이 없기 때문이다.

do {
    long currentTime = System.currentTimeMillis();

    // 락 재시도 시도
    ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);

    // 락 획득 성공 시 루프 종료
    if (ttl == null) {
        return true;
    }

    // tryAcquire 시도에 소요된 시간 차감
    time -= System.currentTimeMillis() - currentTime;
    if (time <= 0L) {
        this.acquireFailed(waitTime, unit, threadId);
        return false;
    }

    currentTime = System.currentTimeMillis();

    // TTL이 남은 대기 시간보다 짧으면 TTL 만큼 대기
    if (ttl >= 0L && ttl < time) {
        ((RedissonLockEntry) this.commandExecutor.getNow(subscribeFuture))
            .getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
        // 그 외에는 남은 전체 시간만큼 대기
        ((RedissonLockEntry) this.commandExecutor.getNow(subscribeFuture))
            .getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    }

    // 대기한 시간만큼 차감
    time -= System.currentTimeMillis() - currentTime;

} while (time > 0L);
                                             

전체 로직

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
        if (ttl == null) {
            return true;
        } else {
            time -= System.currentTimeMillis() - current;
            if (time <= 0L) {
                this.acquireFailed(waitTime, unit, threadId);
                return false;
            } else {
                current = System.currentTimeMillis();
                CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);

                try {
                    subscribeFuture.get(time, TimeUnit.MILLISECONDS);
                } catch (TimeoutException var21) {
                    if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
                        subscribeFuture.whenComplete((res, ex) -> {
                            if (ex == null) {
                                this.unsubscribe(res, threadId);
                            }

                        });
                    }

                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                } catch (ExecutionException var22) {
                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                boolean currentTime;
                try {
                    time -= System.currentTimeMillis() - current;
                    if (time > 0L) {
                        do {
                            long currentTime = System.currentTimeMillis();
                            ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                            if (ttl == null) {
                                boolean var32 = true;
                                return var32;
                            }

                            time -= System.currentTimeMillis() - currentTime;
                            if (time <= 0L) {
                                this.acquireFailed(waitTime, unit, threadId);
                                boolean var31 = false;
                                return var31;
                            }

                            currentTime = System.currentTimeMillis();
                            if (ttl >= 0L && ttl < time) {
                                ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                            } else {
                                ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                            }

                            time -= System.currentTimeMillis() - currentTime;
                        } while(time > 0L);

                        this.acquireFailed(waitTime, unit, threadId);
                        boolean var16 = false;
                        return var16;
                    }

                    this.acquireFailed(waitTime, unit, threadId);
                    currentTime = false;
                } finally {
                    this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
                }

                return currentTime;
            }
        }
    }

profile
Problem Solving과 기술적 의사결정을 중요시합니다.

0개의 댓글