Redisson tryLock 동작 과정

개발하는 구황작물·2024년 4월 4일
0

나름대로 코드 뜯어보았는데 틀린 부분이 있을 수 있습니다. 잘못된 부분이 있다면 알려주세요...

Redisson tryLock 동작 과정

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); //(1)
        if (ttl == null) { // (2)
            return true;
        } else {
            time -= System.currentTimeMillis() - current;
            if (time <= 0L) { 
                this.acquireFailed(waitTime, unit, threadId);
                return false;
            } else {
                current = System.currentTimeMillis();
                CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);

                try { // (3)
                    subscribeFuture.get(time, TimeUnit.MILLISECONDS);
                } catch (TimeoutException var21) {
                    if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
                        subscribeFuture.whenComplete((res, ex) -> {
                            if (ex == null) {
                                this.unsubscribe(res, threadId);
                            }

                        });
                    }

                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                } catch (ExecutionException var22) {
                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                try {
                    time -= System.currentTimeMillis() - current;
                    if (time <= 0L) {
                        this.acquireFailed(waitTime, unit, threadId);
                        boolean var24 = false;
                        return var24;
                    } else {
                        boolean var16;
                        do { // (4)
                            long currentTime = System.currentTimeMillis();
                            ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                            if (ttl == null) {
                                var16 = true;
                                return var16;
                            }

                            time -= System.currentTimeMillis() - currentTime;
                            if (time <= 0L) {
                                this.acquireFailed(waitTime, unit, threadId);
                                var16 = false;
                                return var16;
                            }

                            currentTime = System.currentTimeMillis();
                            if (ttl >= 0L && ttl < time) {
                                ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                            } else {
                                ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                            }

                            time -= System.currentTimeMillis() - currentTime;
                        } while(time > 0L);

                        this.acquireFailed(waitTime, unit, threadId);
                        var16 = false;
                        return var16;
                    }
                } finally {
                    this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
                }
            }
        }
    }

(1)

Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);

tryAcquire() 내부 로직을 보면 아래와 같이 lua 스크립트를 쓰는 것을 볼 수 있다.

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return this.evalWriteSyncedAsync(this.getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
    }
"if ((redis.call('exists', KEYS[1]) == 0) // LOCK KEY 가 존재하는지 확인(없으면 0, 있으면 1)
or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) // 해시맵 기반으로 LOCK KEY와 스테드 아이디로 존재는지 확인(있으면 0, 존재하지 않으면 저장 후 1리턴)
then redis.call('hincrby', KEYS[1], ARGV[2], 1);  // LOCK KEY 가 존재하지 않으면 LOCK KEY와 쓰레드 아이디를 기반으로 값을 1증가 시킨다.
redis.call('pexpire', KEYS[1], ARGV[1]); // LOCK KEY에 유효시간을 설정한다.
return nil; // null을 반환한다.
end; 
return redis.call('pttl', KEYS[1]);" // 만약 위의 조건들이 모두 false 면 LOCK KEY의 TTL 시간을 리턴한다.

위의 과정을 통해 만약 대기가 없는 경우 바로 락을 획득하고 null을 리턴하고 아니라면 LOCK KEY의 TTL 시간을 리턴한다.

(2)

if (ttl == null) {
            return true;
        } else {
            time -= System.currentTimeMillis() - current;
            if (time <= 0L) {
                this.acquireFailed(waitTime, unit, threadId);
                return false;
            }
            ...

만약 ttl 이 null이라면 대기가 없다는 뜻이므로 락을 얻고 true 를 리턴한다
아니라면 waitingTime을 초과했는지 확인하고 초과했다면 false 를 리턴한다.

(3)

current = System.currentTimeMillis();
                CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId); // (3-1)

                try {
                    subscribeFuture.get(time, TimeUnit.MILLISECONDS);
                } catch (TimeoutException var21) {
                    if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
                        subscribeFuture.whenComplete((res, ex) -> {
                            if (ex == null) {
                                this.unsubscribe(res, threadId);
                            }

                        });
                    }

                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                } catch (ExecutionException var22) {
                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                }

여기서 (3-1)의 subscribe를 살펴보면

// RedissonLock.class
protected CompletableFuture<RedissonLockEntry> subscribe(long threadId) {
        return this.pubSub.subscribe(this.getEntryName(), this.getChannelName());
    }

// PublishSubscribe.class

public CompletableFuture<E> subscribe(String entryName, String channelName) {
        AsyncSemaphore semaphore = this.service.getSemaphore(new ChannelName(channelName));
        CompletableFuture<E> newPromise = new CompletableFuture();
        semaphore.acquire().thenAccept((c) -> {
            if (newPromise.isDone()) {
                semaphore.release();
            } else {
            ...
            }

pubsub.subscribe()를 사용하고 있다는 것을 알 수 있다.

더 깊게 확인해보면

semaphore를 활용하는 것을 볼 수 있다.
semaphore를 통해 하나의 스레드가 락을 획득하면 다른 스레드들이 접근하지 못하도록 제어한다.

이후 CompletableFuture get() 메서드를 통해 락 획득이 가능할 때까지 대기한다.

(4)
이후 다시 (2)에서 했던 것처럼 시간 초과가 되었는지 확인한 후

do {
	long currentTime = System.currentTimeMillis();
    ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) { // (4-1)
      var16 = true;
      return var16;
        
    }

    time -= System.currentTimeMillis() - currentTime;
    if (time <= 0L) {
      this.acquireFailed(waitTime, unit, threadId);
      var16 = false;
      return var16;
    }

    currentTime = System.currentTimeMillis();
    if (ttl >= 0L && ttl < time) { // (4-2)
      ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
      ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    }
    time -= System.currentTimeMillis() - currentTime;
  } while(time > 0L);

(4-1)
락을 획득을 다시 시도하여 성공했으면 true를 리턴하고

(4-2)
threadId로 구독한 객체로 유효시간동안 lock이 가능한지 확인한다.

위의 과정을 do ~ while 문으로 시간이 다 될 때까지 루프를 탄다.

정리

정리하자면

  1. tryLock()로 락 획득을 시도
  2. 락을 획득했으면 true 리턴
  3. 락 획득 실패시 waitingTime 확인 후 시간이 남았으면 pubsub 구독 후 다시 락 획득이 가능하면 다시 시도
  4. 1 - 3 과정을 waitingTime이 다 될 때까지 무한반복
profile
어쩌다보니 개발하게 된 구황작물

0개의 댓글