public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); //(1)
if (ttl == null) { // (2)
return true;
} else {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
try { // (3)
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException var21) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
this.unsubscribe(res, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException var22) {
this.acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var24 = false;
return var24;
} else {
boolean var16;
do { // (4)
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
} finally {
this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
}
}
}
}
(1)
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
tryAcquire() 내부 로직을 보면 아래와 같이 lua 스크립트를 사용하는 것을 알 수 있습니다. 이로 인해 속도에 이점을 얻을 수 있습니다.
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.evalWriteSyncedAsync(this.getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
"if ((redis.call('exists', KEYS[1]) == 0) // LOCK KEY 가 존재하는지 확인(없으면 0, 있으면 1)
or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) // 해시맵 기반으로 LOCK KEY와 스테드 아이디로 존재는지 확인(있으면 0, 존재하지 않으면 저장 후 1리턴)
then redis.call('hincrby', KEYS[1], ARGV[2], 1); // LOCK KEY 가 존재하지 않으면 LOCK KEY와 쓰레드 아이디 기반으로 값 1증가.
redis.call('pexpire', KEYS[1], ARGV[1]); // LOCK KEY에 유효시간을 설정한다.
return nil; // null 반환.
end;
return redis.call('pttl', KEYS[1]);" // 만약 위의 조건들이 모두 false라면 LOCK KEY TTL 시간 리턴.
위의 과정을 통해 만약 대기가 없는 경우 바로 락을 획득 후 null을 리턴하고 아니라면 LOCK KEY의 TTL 시간을 리턴합니다.
(2)
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
}
...
만약 ttl 이 null이라면 대기가 없다는 뜻이므로 락을 얻고 true 를 리턴하고,
아니라면 waitingTime을 초과했는지 한 후, 초과했다면 false 를 리턴합니다.
(3)
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId); // (3-1)
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException var21) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException("Unable to acquire subscription lock after " + time + "ms. Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
this.unsubscribe(res, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException var22) {
this.acquireFailed(waitTime, unit, threadId);
return false;
}
subscribe 메서드를 통해 threadId를 채널로 구독하고, CompletableFuture get() 메서드를 통해 락 획득이 가능할 때까지 대기합니다.
// RedissonLock.class
protected CompletableFuture<RedissonLockEntry> subscribe(long threadId) {
return this.pubSub.subscribe(this.getEntryName(), this.getChannelName());
}
// PublishSubscribe.class
public CompletableFuture<E> subscribe(String entryName, String channelName) {
AsyncSemaphore semaphore = this.service.getSemaphore(new ChannelName(channelName));
CompletableFuture<E> newPromise = new CompletableFuture();
semaphore.acquire().thenAccept((c) -> {
if (newPromise.isDone()) {
semaphore.release();
} else {
...
}
더 깊게 확인해보면
semaphore를 활용하는 것을 볼 수 있습니다.
semaphore를 통해 하나의 스레드가 락을 획득하면 다른 스레드들이 접근하지 못하도록 제어합니다.
(4)
이후 다시 (2)에서 했던 것처럼 시간 초과가 되었는지 확인한 후
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) { // (4-1)
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) { // (4-2)
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
(4-1)
락을 획득을 다시 시도하여 성공했으면 true를 리턴하고
(4-2)
threadId로 구독한 객체로 유효시간동안 lock이 가능한지 확인합니다.
위의 과정을 do ~ while 문으로 시간이 다 될 때까지 루프를 탑니다.
정리하자면
의 과정을 거치게 됩니다.
pub/sub를 활용하여 부하가 스핀락보다는 부하가 덜 든다고 하여 구조가 아예 다른 줄 알았는데, 스핀락의 원리를 어느정도 활용한 것을 확인할 수 있었습니다.
다만 스핀락처럼 계속 확인하는 것이 아닌, 락이 풀렸다는 알림이 올 때 락 획득을 시도한다는 점에서 부하 감소가 발생하는 것 같습니다.