Redisson 을 사용한 분산 락

wellbeing-dough·2024년 7월 1일

문제 상황

  • 예전에 그룹 스터디 참여 로직에 Pessimistic Lock(이하 비관적 락)을 이용하여 동시성 이슈를 처리했었다
    링크: https://velog.io/@wellbeing-dough/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B6%80%ED%8A%B8-%EB%8F%99%EC%8B%9C%EC%84%B1-%EC%B2%98%EB%A6%AC

  • 하지만 비관적 락은 락을 거는 동안 다른 트랜잭션이 대기 상태가 되어 성능 저하가 발생할 수 있다 락을 거는 동안 다른 트랜잭션이 대기 상태가 되기 때문이다 특히 트랜잭션이 길어질 경우 문제가 된다.

  • 여러 트랜잭션이 동시에 락을 요청할 때 교착 상태가 발생할 수 있다 이를 해결하기 위해 복잡한 트랜잭션 관리와 교착 상태 회피 알고리즘이 필요하다

  • 데이터베이스 서버의 부하가 증가할 수 있다. 스터디헙 인프라는 오토 스케일링으로 ec2는 늘릴 수 있지만 데이터베이스 이중화는 되어있지 않다.

  • 비관적 락은 단일 데이터베이스 서버의 자원에 의존하므로, 대규모 분산 시스템에서는 확장성이 제한될 수 있다

이런 단점으로 redis를 사용해서 락을 구현하기로 했다

대표적으로 lettuce와 redisson 이 있는데 차이점은

lettuce는 스핀 락 방식으로 구현되어있고 분산 락 기능이 따로 제공되지 않아 따로 직접 구현이 필요하다 하지만 redisson은 pub/sub 구조로 분산락 기능이 기본적으로 제공된다
스핀락 방식은 lock이 해제되었는지 주기적으로 retry를 해야하므로 CPU 리소스를 많이 소모할 수 있다
분산 락 방식은 락에 TTL(Time-To-Live)을 설정하여 프로세스 중단 시 락이 자동으로 해제될 수 있게 한다. 이는 락의 영구적 잔류 문제를 방지한다

그래서 redisson을 사용하여 동시성 이슈를 해결해 보겠다

문제 해결

동시성 이슈에 대한 설명은 예전에 https://velog.io/@wellbeing-dough/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B6%80%ED%8A%B8-%EB%8F%99%EC%8B%9C%EC%84%B1-%EC%B2%98%EB%A6%AC 여기서 알아봤으니 따로 하지 않고 구현만 해보자

일단 pub/sub 에 대해서 알아보면
기본적으로
Publisher: 특정 채널에 메시지를 발행하는 클라이언트입니다. 발행자는 메시지를 특정 채널에 보낼 뿐, 누가 그 메시지를 수신하는지는 알지 못합니다.
Subscriber: 특정 채널을 구독하는 클라이언트입니다. 구독자는 자신이 구독한 채널에 발행된 모든 메시지를 수신합니다.

두개의 터미널에서 redis-cli로 접속 해 보자

첫번째 터미널에서는 subscribe ch1을해보자

이러면 ch1이라는 채널을 구독한다

두번째 터미널에서는 publish ch1 hello!라고 해보자

그러면 첫번째 터미널 즉 ch1을 구독하는 곳에서 hello!라는 메세지를 확인할 수 있다

redis는 자신이 점유하고 있는 락을 해제할 때 채널에 메세지를 보내줘서 락을 획득해야 하는 스레드들에게 락을 획득하라고 전해준다 그러면 그 메세지를 받은 스레드들은 락 획득을 시도하게 된다

이렇게 하면 retry를 사용한 스핀락 방식 보다 redis의 부하를 줄여준다

redisson 라이브러리를 추가했는데

    implementation 'org.redisson:redisson-spring-boot-starter:3.17.4'
"Failed to start bean 'documentationPluginsBootstrapper'; nested exception is java.lang.NullPointerException"

무슨 스웨거에서 빌드 에러가 뜬다

https://www.inflearn.com/questions/625844/%EC%84%A0%EC%83%9D%EB%8B%98%EC%9D%B4-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-spring-%EB%B2%84%EC%A0%84-%EA%B8%B0%EC%A4%80%EC%9C%BC%EB%A1%9C-%EC%8B%A4%EB%AC%B4-%ED%86%B5%ED%95%B4-redisson-%EC%A0%81%EC%9A%A9%EC%8B%9C-%EB%AC%B8%EC%A0%9C

구글링 해봤는데 이런 도움이.... 너무 감사합니다

https://github.com/springfox/springfox/issues/3462#issuecomment-979548234 여기에 해결책이 나와있다

@Component
@RequiredArgsConstructor
@Slf4j
public class StudyPostApplyEventPublisher {

    private final RedissonClient redissonClient;
    private final StudyPostWriter studyPostWriter;
    private final StudyPostReader studyPostReader;

        @Transactional
    public void acceptApplyEventPublish(Long studyId) {
        StudyPostEntity studyPost = studyPostRepository.findByIdWithPessimisticLock(studyId).orElseThrow(PostNotFoundException::new);
        studyPost.decreaseRemainingSeat();
        studyPost.closeStudyPostIfRemainingSeatIsZero();
        studyPostRepository.save(studyPost);
    }


}

기존에 이렇게 낙관적 락을 수행한 부분에서

@Component
@RequiredArgsConstructor
@Slf4j
public class StudyPostApplyEventPublisher {

    private final RedissonClient redissonClient;
    private final StudyPostWriter studyPostWriter;
    private final StudyPostReader studyPostReader;

    @Timer
    public void acceptApplyEventPublish(Long studyPostId) {
        StudyPostEntity studyPost = studyPostReader.readById(studyPostId);
        RLock lock = redissonClient.getLock(studyPost.getId().toString());
        boolean available = false;
        try {
            available = lock.tryLock(10, 1, TimeUnit.SECONDS);
            if (!available) {
                throw new StudyApplyLockAcquisitionException();
                return;
            }
            studyPostWriter.updateStudyPostApply(studyPost.getId());
        } catch (InterruptedException e) {
            throw new StudyApplyLockAcquisitionException();
        } finally {
            if (available) {
                lock.unlock();
            }
        }

    }
}
  • RLock lock = redissonClient.getLock(studyPost.getId().toString()); -> studyPostId를 문자열로 변환하여 락 객체를 생성한다. 이 ID는 Redis에서 락을 구분하는 키로 사용된다

  • available = lock.tryLock(10, 1, TimeUnit.SECONDS); -> 락 획득을 시도한다
    첫번째 매개변수 10: 최대 대기 시간. 락을 얻기 위해 최대 10초 동안 기다린다
    두번째 매개변수 1: 락의 임대 시간. 락을 획득한 후 1초 동안 유지된다
    TimeUnit.SECONDS: 시간 단위로 초를 사용한다
    락을 획득하면 available이 true 그렇지 않으면 false

  • 락을 획득하지 못하면 예외 처리를 했다

그리고 업데이트 하는 로직은

@Component
@RequiredArgsConstructor
@Slf4j
public class StudyPostWriter {

    private final StudyPostRepository studyPostRepository;
    private final StudyPostReader studyPostReader;

    @Transactional
    public void updateStudyPostApply(Long studyPostId) {
        StudyPostEntity studyPost = studyPostRepository.findById(studyPostId).orElseThrow(PostNotFoundException::new);
        studyPost.decreaseRemainingSeat();
        studyPost.closeStudyPostIfRemainingSeatIsZero();
        studyPostRepository.saveAndFlush(studyPost);
    }

}

여기 잘 해놨다

    @Test
    void 동시에_100개의_요청의_스터디_지원서가_수락되면_게시글의_잔여석이_줄어든다() throws InterruptedException {
        // given
        Long postedUserId = 1L;
        StudyPostEntity post = StudyPostEntityFixture.SQLD.studyPostEntity_생성(postedUserId);
        StudyPostEntity savedPost = studyPostRepository.saveAndFlush(post);
        // when
        int threadCount = 100;
        ExecutorService executorService = newFixedThreadPool(32);
        CountDownLatch latch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    studyPostApplyEventPublisher.acceptApplyEventPublish(post.getStudyId());
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();

        // then
        StudyPostEntity actualStudyPost = studyPostRepository.findById(savedPost.getId()).orElseThrow();
        // 100 - (1 * 100) = 0이 되어야 함
        assertEquals(0, actualStudyPost.getRemainingSeat());

    }

테스트 코드도 잘 통과한다

락 획득하는 내부 코드 한번 까보자
근데 pub/sub을 알아서 redisson 내부에서 해주는건가? 처음에 얼마나 많은 스레드가 실패해서 대기중인게 궁금해서
AtomicLong을 스태틱으로 넣어놓고
락 획득 실패한 부분에서 fail++ 를 하여서 몇번 실패했는지 알아봤는데 0번이 나왔다... 어??
아 그러면
available = lock.tryLock(10, 1, TimeUnit.SECONDS);
이부분에서 알아서 락을 pub/sub으로 락을 획득하고 끝끝내 실패한 경우에 락 획득 실패 부분으로 가는구나...

tryLock 라이브러리 코드 까보자
RedissonLock.class

    public boolean tryLock() {
        return (Boolean)this.get(this.tryLockAsync());
    }

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
    }

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
        if (ttl == null) {
            return true;
        } else {
            time -= System.currentTimeMillis() - current;
            if (time <= 0L) {
                this.acquireFailed(waitTime, unit, threadId);
                return false;
            } else {
                current = System.currentTimeMillis();
                CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);

                try {
                    subscribeFuture.get(time, TimeUnit.MILLISECONDS);
                } catch (TimeoutException | ExecutionException var20) {
                    if (!subscribeFuture.cancel(false)) {
                        subscribeFuture.whenComplete((res, ex) -> {
                            if (ex == null) {
                                this.unsubscribe(res, threadId);
                            }

                        });
                    }

                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                try {
                    time -= System.currentTimeMillis() - current;
                    if (time <= 0L) {
                        this.acquireFailed(waitTime, unit, threadId);
                        boolean var22 = false;
                        return var22;
                    } else {
                        boolean var16;
                        do {
                            long currentTime = System.currentTimeMillis();
                            ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                            if (ttl == null) {
                                var16 = true;
                                return var16;
                            }

                            time -= System.currentTimeMillis() - currentTime;
                            if (time <= 0L) {
                                this.acquireFailed(waitTime, unit, threadId);
                                var16 = false;
                                return var16;
                            }

                            currentTime = System.currentTimeMillis();
                            if (ttl >= 0L && ttl < time) {
                                ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                            } else {
                                ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                            }

                            time -= System.currentTimeMillis() - currentTime;
                        } while(time > 0L);

                        this.acquireFailed(waitTime, unit, threadId);
                        var16 = false;
                        return var16;
                    }
                } finally {
                    this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
                }
            }
        }
    }

    protected CompletableFuture<RedissonLockEntry> subscribe(long threadId) {
        return this.pubSub.subscribe(this.getEntryName(), this.getChannelName());
    }

    protected void unsubscribe(RedissonLockEntry entry, long threadId) {
        this.pubSub.unsubscribe(entry, this.getEntryName(), this.getChannelName());
    }

    public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
        return this.tryLock(waitTime, -1L, unit);
    }

    public void unlock() {
        try {
            this.get(this.unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException var2) {
            RedisException e = var2;
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException)e.getCause();
            } else {
                throw e;
            }
        }
    }

비동기적으로 락을 시도하고, 결과를 동기적으로 반환
tryLockAsync() 메소드를 호출하여 비동기적으로 락을 시도하고, 그 결과를 get() 메소드를 통해 동기적으로 반환

tryLockInnerAsync여기서는
Redis Lua 스크립트를 실행하여 락을 시도하고
락이 없으면(redis.call('exists', KEYS[1]) == 0), 락을 설정하고(hincrby), 만료 시간을 설정(pexpire).
현재 스레드가 이미 락을 가지고 있으면(redis.call('hexists', KEYS[1], ARGV[2]) == 1), 락 횟수를 증가시키고(hincrby), 만료 시간을 갱신한다(pexpire).
그렇지 않으면 현재 TTL(Time To Live)을 반환한다(pttl).

가장 중요한 tryLock 메서드는
지정된 대기 시간(waitTime) 동안 락을 시도하고, 락을 획득한 후 지정된 시간(leaseTime) 동안 락을 유지한다

    Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) {
        return true;
    }

ttl: tryAcquire 메소드를 호출하여 락을 시도 락을 획득하면 null을 반환 락을 획득한 경우 true를 반환하고 메소드를 종료한다

    time -= System.currentTimeMillis() - current;
    if (time <= 0L) {
        this.acquireFailed(waitTime, unit, threadId);
        return false;
    }

락을 획득하지 못한 경우, 남은 대기 시간을 계산 남은 시간이 없으면 acquireFailed 메소드를 호출하고 false를 반환

    current = System.currentTimeMillis();
    CompletableFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);

    try {
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
    } catch (TimeoutException | ExecutionException var20) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.whenComplete((res, ex) -> {
                if (ex == null) {
                    this.unsubscribe(res, threadId);
                }
            });
        }
        this.acquireFailed(waitTime, unit, threadId);
        return false;
    }

락 이벤트를 구독하여 락 해제를 기다린다
구독이 완료될 때까지 남은 시간 동안 기다린다
TimeoutException 또는 ExecutionException이 발생하면 구독을 취소하고, acquireFailed 메소드를 호출하여 false를 반환한다

    try {
        time -= System.currentTimeMillis() - current;
        if (time <= 0L) {
            this.acquireFailed(waitTime, unit, threadId);
            boolean var22 = false;
            return var22;
        } else {
            boolean var16;
            do {
                long currentTime = System.currentTimeMillis();
                ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                if (ttl == null) {
                    var16 = true;
                    return var16;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0L) {
                    this.acquireFailed(waitTime, unit, threadId);
                    var16 = false;
                    return var16;
                }

                currentTime = System.currentTimeMillis();
                if (ttl >= 0L && ttl < time) {
                    ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
            } while(time > 0L);

            this.acquireFailed(waitTime, unit, threadId);
            var16 = false;
            return var16;
        }
    } finally {
        this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
    }

이제 반복적으로 락 획득을 시도
tryAcquire 메소드를 호출하여 락을 시도하고, null을 반환하면 락을 획득
남은 시간이 없으면 acquireFailed 메소드를 호출하고 false를 반환
락을 획득하지 못하면 이벤트를 기다리고 다시 시도한다
최종적으로 락을 해제 unsubscribe

결론적으로 간단하게 정리하면
1. tryAcquire를 이용하여 락을 시도하고 획득되면 true 반환
2. 락 획득에 실패했을 때, ttl, 대기시간이 남아있으면 그 스레드는 락이 사용 가능하다는 알림을 받는 채널을 구독
3. 그 스레드는 락이 사용 가능하다는 메시지가 도착할때 까지 대기 대기하다가 ttl이 오버되면 즉, TimeoutException 또는 ExecutionException이 발생하면 구독을 취소하고, acquireFailed 메소드를 호출하여 false를 반환한다
4. 성공적으로 락을 획득하거나, 시간이 초과되서 실패하면 구독 해지

오호 이런 내부 구조를 갖고있다

참고:
https://www.inflearn.com/questions/625844/%EC%84%A0%EC%83%9D%EB%8B%98%EC%9D%B4-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-spring-%EB%B2%84%EC%A0%84-%EA%B8%B0%EC%A4%80%EC%9C%BC%EB%A1%9C-%EC%8B%A4%EB%AC%B4-%ED%86%B5%ED%95%B4-redisson-%EC%A0%81%EC%9A%A9%EC%8B%9C-%EB%AC%B8%EC%A0%9C

https://github.com/springfox/springfox/issues/3462#issuecomment-979548234

0개의 댓글