Redisson은 Java 언어로 구현된 Redis 기반 분산락 클라이언트이다. Redisson은 RLock 인터페이스를 통해 Redis 기반의 분산락을 제공한다. 대표적인 구현체로는 다음과 같다.
RedissonLock: 단일 키 기반의 기본 분산락 구현체
RedissonMultiLock: 여러 개의 락을 동시에 걸어야 할 때 사용하는 멀티락 구현체
RedissonRedLock: Redis RedLock 알고리즘을 기반으로 한 구현체 (여러 Redis 인스턴스를 활용)
이 중 RedissonLock은 가장 기본이 되는 구현체로 내부적으로 Lua 스크립트, Pub/Sub을 활용한다.
public void method() {
RLock lock = redissonClient.getLock(key); // 1
try {
boolean available = lock.tryLock(waitTime, leaseTime, timeUnit); // 2
if (available) { // 3
// business logic
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
finally {
lock.unlock(); // 4
}
}
우선 tryAcquire()를 통해 락 획득을 시도한다. 락이 존재하지 않거나 이미 자신이 소유한 경우에는 락을 획득하고 null을 반환하며, 이는 락 획득 성공을 의미한다. 그렇지 않다면 TTL(남은 점유 시간)을 반환하고 다음 단계로 진행한다.
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
}
🔍 tryAcquire 내부
tryAcquire() 메서드는 내부적으로 Redis에 락을 획득하기 위해 Lua script를 사용한다. 만약 Lua script 없이 일반 redis 명령어 조합으로 구현한다면, 락 존재 여부 확인 -> 락 획득 시도 사이에 다른 클라이언트가 개입할 수 있어 결국 여러 클라이언트가 동시에 락을 획득해버리는 정합성 문제가 발생할 수 있다. 이를 방지하기 위해 redisson은 락 존재 여부 확인 -> 락 획득 시도를 하나의 lua script로 묶어 원자적으로 실행한다. 참고로 락 해제 시에도 동일한 이유로 Lua Script를 사용한다.
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command
) {
return this.commandExecutor.syncedEval(
this.getRawName(),
LongCodec.INSTANCE,
command,
// Lua script
"if ((redis.call('exists', KEYS[1]) == 0) or " +
" (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
" redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
" redis.call('pexpire', KEYS[1], ARGV[1]); " +
" return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getRawName()),
new Object[]{
unit.toMillis(leaseTime), // ARGV[1]: 락 TTL
this.getLockName(threadId) // ARGV[2]: 락 소유자 (UUID + Thread ID)
}
);
}
TTL이 남아있는 상황에서 락을 얻지 못했을 경우, 현재까지 대기한 시간을 반영하여 waitTime이 초과되었는지 확인한다. 초과되었다면 실패 처리 후 false를 반환한다.
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
}
다른 스레드가 락을 해제하면 Redis pub/sub 채널을 통해 해당 이벤트를 받을 수 있도록 구독을 수행한다. 만약 채널을 구독하여 대기하다가 waitTime이 지나면 TimeOutException이 발생한다.
CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException var21) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
this.unsubscribe(res, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
}
public CompletableFuture<E> subscribe(String entryName, String channelName) {
AsyncSemaphore semaphore = this.service.getSemaphore(new ChannelName(channelName));
CompletableFuture<E> newPromise = new CompletableFuture();
semaphore.acquire().thenAccept((c) -> {
if (newPromise.isDone()) {
semaphore.release();
...
세마포어를 획득한 스레드만 subscribe()를 완료한 뒤, 락 재시도 루프에 진입한다.
이 과정을 남은 waitTime 내에서 반복하며, waitTime이 초과되면 락 획득을 포기하고 false를 반환한다. 참고로 TTL 만료 시 직접 tryAcquire()를 다시 호출하는 이유는, Pub/Sub 메시지가 반드시 수신된다는 보장이 없기 때문이다.
do {
long currentTime = System.currentTimeMillis();
// 락 재시도 시도
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
// 락 획득 성공 시 루프 종료
if (ttl == null) {
return true;
}
// tryAcquire 시도에 소요된 시간 차감
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
}
currentTime = System.currentTimeMillis();
// TTL이 남은 대기 시간보다 짧으면 TTL 만큼 대기
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry) this.commandExecutor.getNow(subscribeFuture))
.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 그 외에는 남은 전체 시간만큼 대기
((RedissonLockEntry) this.commandExecutor.getNow(subscribeFuture))
.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 대기한 시간만큼 차감
time -= System.currentTimeMillis() - currentTime;
} while (time > 0L);
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException var21) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
this.unsubscribe(res, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException var22) {
this.acquireFailed(waitTime, unit, threadId);
return false;
}
boolean currentTime;
try {
time -= System.currentTimeMillis() - current;
if (time > 0L) {
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
boolean var32 = true;
return var32;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var31 = false;
return var31;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
boolean var16 = false;
return var16;
}
this.acquireFailed(waitTime, unit, threadId);
currentTime = false;
} finally {
this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
}
return currentTime;
}
}
}