[Spring Data]JPA를 활용한 DataBase Lock & Redis Lock 적용

YouMakeMeSmile·2024년 1월 10일
1
post-thumbnail

서버 개발자라면 피할수 없는 문제중 동시성이 존재한다. 요즘과 같은 K8S 환경에서는 아무리 어플리케이션에서 동시성 발생을 위한 구현을 하여도 다른 POD에서 같은 데이터에 접근하는 충돌에 대해서는 방법이 없다.

이러한 경우 DataBase 또는 Redis에서 제공하는 Lock를 활용하여 데이터의 동시성 접근을 제어하는 방법을 이용해야 한다.
DataBase Lock를 이용할 경우 추가적인 인프라 구성요소 없이 동시성을 해결할 수 있다는 장점이 있으나 Lock 획득을 위해 Waiting되는 DataBase Connection 증가로 인해 부하가 발생할 수 있다.
Redis를 이용하여 Distributed Lock를 사용할 경우 DataBase Connection 증가는 방지 할 수 있지만 Redis의 관리가 필요하다.


DataBase Lock

우선은 DataBase Lock의 구현 방법에 대해서 정리를 해보겠다. DataBase Lock 방법에는 JPA를 활용한 Optimistic Lock 방식과 Pessimistic Lock방식이 존재한다.
Optimistic Lock방식은 어플리케이션 내부에서 처리되는 방식으로 INSERT,UPDATE, DELETE가 flush 되는 시점에서 version 속성의 값을 비교하는 방식이기 때문에 로직이 수행되며 retry 구현이 필요하게 된다.
구현은 Spring Data Jpa를 통해 Pessimistic Lock를 활용하여 구현을 진행을 하겠다.


구현 내용은 상품을 등록하고 해당 상품의 재고를 하나씩 줄이는 시나리오이다.

@Entity
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class Item {
    @Id
    private String id;
    private String name;
    private int count;
    @Version
    private long version;
}

Optimistic Lock

우선은 Optimistic Lock으로만 로직이 구성되어 있는 경우를 확인해보겠다.

public void init() {
    itemRepository.deleteAll();
    itemRepository.save(new Item("TEST", "상품", 1000, 0l));
}

public void test1() {
    itemRepository.findById("TEST").map(value -> {
        value.setCount(value.getCount() - 1);
        return value;
    }).ifPresent(value -> itemRepository.save(value));
}

ItemRepository.findById 메소드는 JpaRepository에서 기본 정의되어 있는 QueryMethod를 활용한 것이며 JMeter를 활용하여 test1() 메소드를 1000번 실행시켜보겠다.

ERROR 45528 --- [o-8080-exec-167] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.springframework.orm.ObjectOptimisticLockingFailureException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [io.velog.youmakemesmile.lock.Item#TEST]] with root cause
org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [io.velog.youmakemesmile.lock.Item#TEST]

1000번을 실행 결과 66%의 예외가 발생하였다. 예외는 ObjectOptimisticLockingFailureException으로 Optimistic Lock 충돌이 발생한것이다.


Pessimistic Lock

이번에는 Pessimistic Lock를 적용하여 동시성 문제를 해결해보겠다. 대표적인 Row LevelLock에는 Shared Lock(sLock)Exclusive Lock(xLock)이 존재한다. 자세한 내용은 MySQL문서를 참고하면 된다.

현재 시나리오에서 사용해야되는 LockExclusive Lock(xLock)이며 Spring Data JPA에서는 ockModeType.PESSIMISTIC_WRITE에 해당한다.

public interface ItemRepository extends JpaRepository<Item, String> {
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @QueryHints({@QueryHint(name = "jakarta.persistence.lock.timeout", value = "3000")})
    Optional<Item> findById(String id);
}

QueryMethod에서는 @Lock를 통해 해당 쿼리가 실행될때 Lock를 획득하는 쿼리가 추가되며 @QueryHintsLock TimeOut을 통해 대기 시간을 설정할 수 있다. 이렇게 추가하여 기존과 동일한 JMeter를 실행시켜보겠다.

Hibernate: select i1_0.id,i1_0.count,i1_0.name,i1_0.version from item i1_0 where i1_0.id=? for update wait 3
Hibernate: update item set count=?,name=?,version=? where id=? and version=?

1000번을 실행 결과 예외가 발생하지 않았으며 쿼리 로그에도 for update wait 3가 추가된것을 볼 수 있다.
위와 같이 QueryMethodAnnotation을 추가하는 방식 외에도 EntityManager를 이용하는 방법도 있다.


public void test1() {
    Item test = itemRepository.findById("TEST").get();
    entityManager.lock(test, LockModeType.PESSIMISTIC_WRITE);
    test.setCount(test.getCount() - 1);
    itemRepository.save(test);
}

위와 같이 기본 JpaRepository.findById 조회이후 해당 EntityEntityManager.lock를 통해서 Lock를 획득하면 될것 같지만 결과는 다음과 같다.

Hibernate: select i1_0.id,i1_0.count,i1_0.name,i1_0.version from item i1_0 where i1_0.id=?
Hibernate: select id from item where id=? and version=? for update
Hibernate: update item set count=?,name=?,version=? where id=? and version=?

로그와 같이 조회이후에 다시 해당 Entity의 조회를 통해 Lock를 획득하기 때문에 중간에 Lock이 없는 시간이 존재하게된다.

위의 로직을 다음과 같이 변경한다면 QueryMethod와 동일하게 동작하게된다.

public void test1() throws InterruptedException {
    Item test = entityManager.find(Item.class, "TEST", LockModeType.PESSIMISTIC_WRITE);
    test.setCount(test.getCount() - 1);
    itemRepository.save(test);
}

Hibernate: select i1_0.id,i1_0.count,i1_0.name,i1_0.version from item i1_0 where i1_0.id=? for update
Hibernate: update item set count=?,name=?,version=? where id=? and version=?

위와 같이 예외는 발생하지 않으며 쿼리도 동일하게 QueryMethod와 동일하게 실행된다.

지금까지는 DataBase만을 활용하여 동시성을 해결하는 방법에 대해 정리하였고 다음은 Redis를 활용하여 Distributed Lock를 구현하는 방법에 대해 정리하겠다.


Redis Distributed Lock

Redis문서에서 Distributed Lock를 위해 RedLock이라는 알고리즘 제시하고 있으며 Java에서의 구현된 라이브러리는 Redisson이 존재한다.

Redisson에서는 spring boot stater를 제공하고 있으며 여기에서 확인하여 버전의 의존성을 추가하면되며 내부에 spring-boot-starter-data-redis 의존성도 같이 존재하여 따로 추가할 필요 없다.

Redisson Config

기본적으로 Redis와 통신하기 위한 RedisClientRedisConnectionFactory를 생성해야 한다.
Redisson에서는 RedissonClientRedissonConnectionFactoryBean으로 생성해야 한다. 아래의 예시는 Redis서버를 하나 구성하였을 경우이며 Redis 구성에 따른 설정은 다음 문서를 확인하면 된다.

@Configuration
public class RedisConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://localhost:6379");
        return Redisson.create(config);
    }

    @Bean
    public RedissonConnectionFactory redisConnectionFactory(RedissonClient redissonClient) {
        return new RedissonConnectionFactory(redissonClient);
    }
}

Redisson Distributed Lock

Redisson를 활용한 Distributed Lock 사용 방식은 간단하며 다음 문서를 참고하면 된다.

  1. RedissonClient를 통해 해당 Lockkey를 설정하여 RLock 생성
  2. RLock.lock 또는 RLock.tryLock를 통해 Lock 획득이 가능하며 waitTimeleaseTime 설정 가능
    • waitTime: Lock 획득 대기 시간
    • leaseTime: Lock 획득 이후 소유 시간
  3. RLock.unlock를 통해 Lock 반납
@Transactional
public Item test1() {
    RLock lock = redissonClient.getLock("TEST");
    try {
        if (lock.tryLock(5000, 3000, TimeUnit.MILLISECONDS)) {
            Item item = itemRepository.findById("TEST").get();
            item.setCount(item.getCount() - 1);
            itemRepository.save(item);
            return item;
        } else {
            throw new RuntimeException();
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        if (lock.isLocked() && lock.isHeldByCurrentThread()) {
            lock.unlock();
        }
    }
}

문제 경우1: Optimistic Lock

다음 경우는 Optimistic Lock을 적용한 Entity의 실행 결과이다.

ERROR 7343 --- [io-8080-exec-16] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.springframework.orm.ObjectOptimisticLockingFailureException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [io.velog.youmakemesmile.lock.Item#TEST]] with root cause
org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [io.velog.youmakemesmile.lock.Item#TEST]

실행 결과 예외가 발생한 경우가 있으며 ObjectOptimisticLockingFailureException가 발생했다.

문제 경우2: Optimistic Lock 제거

다음 경우는 Optimistic Lock를 적용하지 않은 즉 Entity에서 @Version 속성이 없는 실행 결과 이다.

실행 결과 예외는 하나는 발생되지 않았지만 count가 15가 남아있는게 확인되었다.

문제 이유

무엇 때문에 예상과 다르게 Distributed Lock이 동작된 것일까?
문제는 Spring Transaction CommitDistributed Lock 반납 시점차가 존재하는 것이다.

위에 그림과 같이 Distributed Lock 반납Spring Transaction Commit보다 먼저 실행되어 해당 시점에 다른 요청이 있는 경우 Commit 이전의 데이터를 조회하여 예상과 다른 결과가 나타나게 되며 다음 로그를 통해서도 수행 순서를 확인해 볼 수 있다.

해결 방안1: @TransactionalEventListener

이전글 [Spring Kafka]Kafka Producer 정리 근데 이제 실제 업무와 Transaction을 곁들인에서 다루었던 Kafka MessageSpring Transaction 수행 결과에 따라 발행하기 위하여 사용했던 방식인 @TransactionalEventListener를 활용하여 Distributed Lock 반납Spring Transaction 이후에 실행할 수 있다.

@Transactional
public Item test1() {
    RLock lock = redissonClient.getLock("TEST");
    try {
        if (lock.tryLock(5000, 3000, TimeUnit.MILLISECONDS)) {
            Item item = itemRepository.findById("TEST").get();
            item.setCount(item.getCount() - 1);
            itemRepository.save(item);
            return item;
        } else {
            throw new RuntimeException();
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        if (lock.isLocked() && lock.isHeldByCurrentThread()) {
            applicationEventPublisher.publishEvent(lock);
        }
    }
}

@EventListener
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void test3(RLock rLock) {
    rLock.unlock();
}

위와 같이 로직을 구현하고 JMeter를 활용하여 100번을 실행한 결과 예외가 발생하지 않고 동시성 문제없이 정상적으로 수행되었다.

다음은 해당 로직의 흐름과 이에 대한 실행 로그이다.

결과적으로 흐림 순서는 다음과 같다.

Tx#1 시작 -> Distributed Lock 획득 -> 로직 -> Tx#1 종료 -> Tx#2 시작 -> Distributed Lock 반납 -> Tx#2 종료

위와 같이 핵심은 Tx 종료 이후에 Distributed Lock 반납이 이루어져야 한다는 것이다.

해결 방안2: Spring Transaction 분리

다음 방식은 해결 방안1과 비슷한 방식이지만 @TransactionalEventListener를 통한 Transaction 분리가 아닌 @Transactionalpropagation = Propagation.REQUIRES_NEW를 활용하여 분리하는 방식이다.

@Transactional
public Item test1() {
    RLock lock = redissonClient.getLock("TEST");
    try {
        if (lock.tryLock(5000, 3000, TimeUnit.MILLISECONDS)) {

            return testService.test();
        } else {
            throw new RuntimeException();
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        if (lock.isLocked() && lock.isHeldByCurrentThread()) {
            lock.unlock();
        }
    }
}
@Service
@RequiredArgsConstructor
public class TestService {
    private final ItemRepository itemRepository;

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public Item test(){
        Item item = itemRepository.findById("TEST").get();
        item.setCount(item.getCount() - 1);
        itemRepository.save(item);
        return item;
    }
}

위와 같이 로직을 구현하고 동일하게 JMeter를 활용하여 100번을 실행하며 동시성 문제없이 정상적으로 수행된다.

다음은 해당 로직의 흐름과 이에 대한 실행 로그이다.

해당 방식 결과의 흐름은 다음과 같다.

Tx#1 시작 -> Distributed Lock 획득 -> Tx#2 시작 -> 로직 -> Tx#2 종료 -> Distributed Lock 반납 -> Tx#1 종료

🚨주의점🚨

해결 방안1해결 방안2의 예제는 아주 간단한 Transaction 로직을 예시로 구성한 경우이다.
예제의 두 방안의 결과는 같지만 실제 업무에 두 방안을 적용할 때에는 해당 업무의 로직에 따라 Transaction을 고려하여 적용하여야 한다.

해결 방안1의 경우에는 기존 Transaction에서 ApplicationEvent를 이용하는 방식으로 Distributed Lock 획득 이후의 Transaction을 기존과 동일한 Transaction으로 포함 실킬 수 있다.

해결 방안2의 경우에는 반드시 Distributed Lock 획득 이후의 로직이 항상 새로운 Transaction에서 실행되므로 Distributed Lock 획득 이전과 Distributed Lock 획득 이후에 로직이 있는경우 해당 사항을 고려해야 한다.

해결 방안1 예제

@Transactional
public Item test1() {
    RLock lock = redissonClient.getLock("TEST");
    try {
        if (lock.tryLock(5000, 3000, TimeUnit.MILLISECONDS)) {
            Item item = itemRepository.findById("TEST").get();
            item.setCount(item.getCount() - 1);
            itemRepository.save(item);

            return testService.test();
        } else {
            throw new RuntimeException();
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        if (lock.isLocked() && lock.isHeldByCurrentThread()) {
            applicationEventPublisher.publishEvent(lock);
        }
    }
    Item item = itemRepository.findById("TEST").get();
    log.info("Count : {}", item.getCount());
}
@Service
@RequiredArgsConstructor
public class TestService {
    private final ItemRepository itemRepository;

    @Transactional
    public Item test(){
        Item item = itemRepository.findById("TEST").get();
        item.setCount(item.getCount() - 1);
        itemRepository.save(item);
        return item;
    }
}
INFO 23282 --- [nio-8080-exec-7] i.v.youmakemesmile.lock.TestController   : Count : -100

초기 Count의 값은 100을 설정하였으며 -2를 수행하는 로직을 100번 실행한 결과이다.
마지막 로그의 Count-100를 출력한 것을 확인할 수 있다. 즉 test1()test() 메소드가 모두 같은 Transaction에 포함되어 실행 된다는것을 알 수 있다.

해결 방안2 예제

@Transactional
public Item test1() {
    RLock lock = redissonClient.getLock("TEST");
    try {
        if (lock.tryLock(5000, 3000, TimeUnit.MILLISECONDS)) {
            Item item = itemRepository.findById("TEST").get();
            item.setCount(item.getCount() - 1);
            itemRepository.save(item);
            testService.test();
        } else {
            throw new RuntimeException();
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        if (lock.isLocked() && lock.isHeldByCurrentThread()) {
            lock.unlock();
        }
    }
    Item item = itemRepository.findById("TEST").get();
    log.info("Count : {}", item.getCount());
    return item;
}
@Service
@RequiredArgsConstructor
public class TestService {
    private final ItemRepository itemRepository;

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public Item test(){
        Item item = itemRepository.findById("TEST").get();
        item.setCount(item.getCount() - 1);
        itemRepository.save(item);
        return item;
    }
}
INFO 23426 --- [io-8080-exec-14] i.v.youmakemesmile.lock.TestController   : Count : 0

해결 방안1과 같이 초기 값 100에서 로직을 100번 실행한 결과이다.
마지막 로그의 Count가 0를 출력한 것을 확인할 수 있다. 사실 당연한 결과로 Distributed Lock은 해당 부분을 하나의 Thread에서만 실행 가능하도록 Lock를 잡아주는 것이지 Transaction과는 연관이 없다.


이처럼 Distributed Lock을 적용 할 경우 해당 로직의 Transaction를 고려하여 방식을 적용하고 로직을 구현해야 할 것이다.

Annotation & Aop 적용

위의 해결 방안1해결 방안2Distributed Lock 획득과 반납으로 인한 BolilerPlate 코드가 존재하게 된다.
이러한 로직을 AnnotationAop를 활용하여 정리가능하다.

Annotation

Annotation에는 Distributed Lock 획득을 위한 key, waitTime, leaseTime, timeUnit를 정의한다.

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedLock {
    String key();
    long waitTime() default 10000L;
    long leaseTime() default 3000L;
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}

해결 방안1: Aop 전환

Aspect@Around에서 @RedLockPointCut으로 설정하여 해당 JoinPoint가 실행하기 이전에 tryLock를 통해 Lock 획득을 시도하며 이후에 finally에서 ApplicationEvent를 발행하는 로직이다.

@Aspect
@Component
@RequiredArgsConstructor
public class RedLockAop {
    private final RedissonClient redissonClient;
    private final ApplicationEventPublisher applicationEventPublisher;

    @Around("@annotation(io.velog.youmakemesmile.lock.RedLock)")
    public Object lock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
        RedLock redLock = signature.getMethod().getAnnotation(RedLock.class);

        RLock lock = redissonClient.getLock(getDynamicValue(signature.getParameterNames(), proceedingJoinPoint.getArgs(), redLock.key()).toString());
        try {
            if (lock.tryLock(redLock.waitTime(), redLock.leaseTime(), redLock.timeUnit())) {
                return proceedingJoinPoint.proceed();
            } else {
                throw new RuntimeException();
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (lock.isLocked() && lock.isHeldByCurrentThread()) {
                applicationEventPublisher.publishEvent(lock);
            }
        }
    }

    public static Object getDynamicValue(String[] parameterNames, Object[] args, String key) {
        ExpressionParser parser = new SpelExpressionParser();
        StandardEvaluationContext context = new StandardEvaluationContext();
        for (int i = 0; i < parameterNames.length; i++) {
            context.setVariable(parameterNames[i], args[i]);
        }
        return parser.parseExpression(key).getValue(context, Object.class);
    }
    
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
    public void test3(RLock rLock) {
        rLock.unlock();
    }
}
@Transactional
@RedLock(key = "'TEST'")
public Item test1() {
    Item item = itemRepository.findById("TEST").get();
    item.setCount(item.getCount() - 1);
    return itemRepository.save(item);
}

해결 방안2: Aop 전환

해결 방안1의 로직과 거의 일치하며 다른 부분은 Transaction 분리를 위해서 Service를 만들어 해당 메소드에 Propagation.REQUIRES_NEW를 정의하고 해당 메소드를 호출해야 한다는 점이다.

@Aspect
@Component
@RequiredArgsConstructor
public class RedLockAop {
    private final RedissonClient redissonClient;
    private final RequiresNewService requiresNewService;
    private final ApplicationEventPublisher applicationEventPublisher;

    @Around("@annotation(io.velog.youmakemesmile.lock.RedLock)")
    public Object lock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
        RedLock redLock = signature.getMethod().getAnnotation(RedLock.class);

        RLock lock = redissonClient.getLock(getDynamicValue(signature.getParameterNames(), proceedingJoinPoint.getArgs(), redLock.key()).toString());
        try {
            if (lock.tryLock(redLock.waitTime(), redLock.leaseTime(), redLock.timeUnit())) {
                return requiresNewService.proceed(proceedingJoinPoint);
            } else {
                throw new RuntimeException();
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (lock.isLocked() && lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
    public static Object getDynamicValue(String[] parameterNames, Object[] args, String key) {
        ExpressionParser parser = new SpelExpressionParser();
        StandardEvaluationContext context = new StandardEvaluationContext();
        for (int i = 0; i < parameterNames.length; i++) {
            context.setVariable(parameterNames[i], args[i]);
        }
        return parser.parseExpression(key).getValue(context, Object.class);
    }
}
@Service
public class RequiresNewService {
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public Object proceed(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        return proceedingJoinPoint.proceed();
    }
}

🚨Hikari Connection Pool & Tomcat Max Worker Threads🚨

해결 방안1해결 방안2를 테스트하다 요청 수를 증가시키면 다음과 같은 Time Out이 발생하는 경우가 존재하였다.

HikariPool-1 - Connection is not available, request timed out after 30008ms.

발생 이유는 Hikari Conntection Pool SizeTomcat Max Worker Thread의 수에 따른 Hikari DeadLock이 발생한 경우였다.

위와 같은 순서에 의해서 Hikari DeadLock 상태가 발생하고 Distributed Lock 획득 대기를 하고 있는 ThreadConnection들과 Distributed Lock은 획득 했으나 새로운 Connection 획득을 대기하는 Connection들에서 TimeOut이 발생하는 것이다.

해결 방안

해당 문제의 해결 방안은 정답이 있을 수는 없다고 생각한다. 지금같은 경우는 조건이 모든 요청이 같은 요청이라는 경우였다.
하지만 실제 업무에서는 다른 요청도 동시에 발생할것이라 다른 로직에서 몇개의 Connection를 사용할지는 모른다.

위와 같은 경우라면 Hikari Database Connection Pool의 개수를 Worker Thread Pool의 개수보다 여유롭게 설정하면 해결된다.

그런데 그렇다고 Hikari Connection의 개수를 증가 시킨다면 해당 로직에서는 Connection만 획득하고 Distributed Lock 획득 대기를 하는 Thread만 증가하여 의미 없는 Connection만 증가하고 Database의 부하만 발생한다.

결국 해당 어플리케이션의 특성을 고려하여 적절히 Worker Thread Pool의 개수와 Hikari Connection Pool의 개수를 조절하여 사용해야 한다.

profile
알고싶고 하고싶은게 많은 주니어 개발자 입니다.

0개의 댓글