서버 개발자라면 피할수 없는 문제중 동시성
이 존재한다. 요즘과 같은 K8S
환경에서는 아무리 어플리케이션에서 동시성
발생을 위한 구현을 하여도 다른 POD
에서 같은 데이터에 접근하는 충돌에 대해서는 방법이 없다.
이러한 경우 DataBase
또는 Redis
에서 제공하는 Lock
를 활용하여 데이터의 동시성 접근을 제어하는 방법을 이용해야 한다.
DataBase Lock
를 이용할 경우 추가적인 인프라 구성요소 없이 동시성
을 해결할 수 있다는 장점이 있으나 Lock
획득을 위해 Waiting
되는 DataBase Connection
증가로 인해 부하가 발생할 수 있다.
Redis
를 이용하여 Distributed Lock
를 사용할 경우 DataBase Connection
증가는 방지 할 수 있지만 Redis
의 관리가 필요하다.
DataBase Lock
우선은 DataBase Lock
의 구현 방법에 대해서 정리를 해보겠다. DataBase Lock
방법에는 JPA
를 활용한 Optimistic Lock
방식과 Pessimistic Lock
방식이 존재한다.
Optimistic Lock
방식은 어플리케이션 내부에서 처리되는 방식으로 INSERT
,UPDATE
, DELETE
가 flush 되는 시점에서 version
속성의 값을 비교하는 방식이기 때문에 로직이 수행되며 retry
구현이 필요하게 된다.
구현은 Spring Data Jpa
를 통해 Pessimistic Lock
를 활용하여 구현을 진행을 하겠다.
구현 내용은 상품을 등록하고 해당 상품의 재고를 하나씩 줄이는 시나리오이다.
@Entity
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class Item {
@Id
private String id;
private String name;
private int count;
@Version
private long version;
}
우선은 Optimistic Lock
으로만 로직이 구성되어 있는 경우를 확인해보겠다.
public void init() {
itemRepository.deleteAll();
itemRepository.save(new Item("TEST", "상품", 1000, 0l));
}
public void test1() {
itemRepository.findById("TEST").map(value -> {
value.setCount(value.getCount() - 1);
return value;
}).ifPresent(value -> itemRepository.save(value));
}
ItemRepository.findById
메소드는 JpaRepository
에서 기본 정의되어 있는 QueryMethod
를 활용한 것이며 JMeter
를 활용하여 test1()
메소드를 1000번 실행시켜보겠다.
ERROR 45528 --- [o-8080-exec-167] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.springframework.orm.ObjectOptimisticLockingFailureException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [io.velog.youmakemesmile.lock.Item#TEST]] with root cause
org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [io.velog.youmakemesmile.lock.Item#TEST]
1000번을 실행 결과 66%의 예외가 발생하였다. 예외는 ObjectOptimisticLockingFailureException
으로 Optimistic Lock
충돌이 발생한것이다.
Pessimistic Lock
이번에는 Pessimistic Lock
를 적용하여 동시성
문제를 해결해보겠다. 대표적인 Row Level
의 Lock
에는 Shared Lock(sLock)
과 Exclusive Lock(xLock)
이 존재한다. 자세한 내용은 MySQL문서를 참고하면 된다.
현재 시나리오에서 사용해야되는 Lock
은 Exclusive Lock(xLock)
이며 Spring Data JPA
에서는 ockModeType.PESSIMISTIC_WRITE
에 해당한다.
public interface ItemRepository extends JpaRepository<Item, String> {
@Lock(LockModeType.PESSIMISTIC_WRITE)
@QueryHints({@QueryHint(name = "jakarta.persistence.lock.timeout", value = "3000")})
Optional<Item> findById(String id);
}
QueryMethod
에서는 @Lock
를 통해 해당 쿼리가 실행될때 Lock
를 획득하는 쿼리가 추가되며 @QueryHints
에 Lock TimeOut
을 통해 대기 시간을 설정할 수 있다. 이렇게 추가하여 기존과 동일한 JMeter
를 실행시켜보겠다.
Hibernate: select i1_0.id,i1_0.count,i1_0.name,i1_0.version from item i1_0 where i1_0.id=? for update wait 3
Hibernate: update item set count=?,name=?,version=? where id=? and version=?
1000번을 실행 결과 예외가 발생하지 않았으며 쿼리 로그에도 for update wait 3
가 추가된것을 볼 수 있다.
위와 같이 QueryMethod
에 Annotation
을 추가하는 방식 외에도 EntityManager
를 이용하는 방법도 있다.
public void test1() {
Item test = itemRepository.findById("TEST").get();
entityManager.lock(test, LockModeType.PESSIMISTIC_WRITE);
test.setCount(test.getCount() - 1);
itemRepository.save(test);
}
위와 같이 기본 JpaRepository.findById
조회이후 해당 Entity
를 EntityManager.lock
를 통해서 Lock
를 획득하면 될것 같지만 결과는 다음과 같다.
Hibernate: select i1_0.id,i1_0.count,i1_0.name,i1_0.version from item i1_0 where i1_0.id=?
Hibernate: select id from item where id=? and version=? for update
Hibernate: update item set count=?,name=?,version=? where id=? and version=?
로그와 같이 조회이후에 다시 해당 Entity
의 조회를 통해 Lock
를 획득하기 때문에 중간에 Lock
이 없는 시간이 존재하게된다.
위의 로직을 다음과 같이 변경한다면 QueryMethod
와 동일하게 동작하게된다.
public void test1() throws InterruptedException {
Item test = entityManager.find(Item.class, "TEST", LockModeType.PESSIMISTIC_WRITE);
test.setCount(test.getCount() - 1);
itemRepository.save(test);
}
Hibernate: select i1_0.id,i1_0.count,i1_0.name,i1_0.version from item i1_0 where i1_0.id=? for update
Hibernate: update item set count=?,name=?,version=? where id=? and version=?
위와 같이 예외는 발생하지 않으며 쿼리도 동일하게 QueryMethod
와 동일하게 실행된다.
지금까지는 DataBase
만을 활용하여 동시성
을 해결하는 방법에 대해 정리하였고 다음은 Redis
를 활용하여 Distributed Lock
를 구현하는 방법에 대해 정리하겠다.
Redis Distributed Lock
Redis
는 문서에서 Distributed Lock
를 위해 RedLock
이라는 알고리즘 제시하고 있으며 Java
에서의 구현된 라이브러리는 Redisson
이 존재한다.
Redisson
에서는 spring boot stater
를 제공하고 있으며 여기에서 확인하여 버전의 의존성을 추가하면되며 내부에 spring-boot-starter-data-redis
의존성도 같이 존재하여 따로 추가할 필요 없다.
Redisson Config
기본적으로 Redis
와 통신하기 위한 RedisClient
와 RedisConnectionFactory
를 생성해야 한다.
Redisson
에서는 RedissonClient
와 RedissonConnectionFactory
를 Bean
으로 생성해야 한다. 아래의 예시는 Redis
서버를 하나 구성하였을 경우이며 Redis
구성에 따른 설정은 다음 문서를 확인하면 된다.
@Configuration
public class RedisConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://localhost:6379");
return Redisson.create(config);
}
@Bean
public RedissonConnectionFactory redisConnectionFactory(RedissonClient redissonClient) {
return new RedissonConnectionFactory(redissonClient);
}
}
Redisson Distributed Lock
Redisson
를 활용한 Distributed Lock
사용 방식은 간단하며 다음 문서를 참고하면 된다.
RedissonClient
를 통해 해당 Lock
의 key
를 설정하여 RLock
생성RLock.lock
또는 RLock.tryLock
를 통해 Lock
획득이 가능하며 waitTime
과 leaseTime
설정 가능waitTime
: Lock
획득 대기 시간leaseTime
: Lock
획득 이후 소유 시간RLock.unlock
를 통해 Lock
반납@Transactional
public Item test1() {
RLock lock = redissonClient.getLock("TEST");
try {
if (lock.tryLock(5000, 3000, TimeUnit.MILLISECONDS)) {
Item item = itemRepository.findById("TEST").get();
item.setCount(item.getCount() - 1);
itemRepository.save(item);
return item;
} else {
throw new RuntimeException();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
Optimistic Lock
다음 경우는 Optimistic Lock
을 적용한 Entity
의 실행 결과이다.
ERROR 7343 --- [io-8080-exec-16] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.springframework.orm.ObjectOptimisticLockingFailureException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [io.velog.youmakemesmile.lock.Item#TEST]] with root cause
org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [io.velog.youmakemesmile.lock.Item#TEST]
실행 결과 예외가 발생한 경우가 있으며 ObjectOptimisticLockingFailureException
가 발생했다.
Optimistic Lock
제거다음 경우는 Optimistic Lock
를 적용하지 않은 즉 Entity
에서 @Version
속성이 없는 실행 결과 이다.
실행 결과 예외는 하나는 발생되지 않았지만 count
가 15가 남아있는게 확인되었다.
무엇 때문에 예상과 다르게 Distributed Lock
이 동작된 것일까?
문제는 Spring Transaction Commit
과 Distributed Lock 반납
시점차가 존재하는 것이다.
위에 그림과 같이 Distributed Lock 반납
이 Spring Transaction Commit
보다 먼저 실행되어 해당 시점에 다른 요청이 있는 경우 Commit
이전의 데이터를 조회하여 예상과 다른 결과가 나타나게 되며 다음 로그를 통해서도 수행 순서를 확인해 볼 수 있다.
@TransactionalEventListener
이전글 [Spring Kafka]Kafka Producer 정리 근데 이제 실제 업무와 Transaction을 곁들인에서 다루었던 Kafka Message
를 Spring Transaction
수행 결과에 따라 발행하기 위하여 사용했던 방식인 @TransactionalEventListener
를 활용하여 Distributed Lock 반납
을 Spring Transaction
이후에 실행할 수 있다.
@Transactional
public Item test1() {
RLock lock = redissonClient.getLock("TEST");
try {
if (lock.tryLock(5000, 3000, TimeUnit.MILLISECONDS)) {
Item item = itemRepository.findById("TEST").get();
item.setCount(item.getCount() - 1);
itemRepository.save(item);
return item;
} else {
throw new RuntimeException();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
applicationEventPublisher.publishEvent(lock);
}
}
}
@EventListener
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void test3(RLock rLock) {
rLock.unlock();
}
위와 같이 로직을 구현하고 JMeter
를 활용하여 100
번을 실행한 결과 예외가 발생하지 않고 동시성
문제없이 정상적으로 수행되었다.
다음은 해당 로직의 흐름과 이에 대한 실행 로그이다.
결과적으로 흐림 순서는 다음과 같다.
Tx#1 시작
->Distributed Lock 획득
->로직
->Tx#1 종료
->Tx#2 시작
->Distributed Lock 반납
->Tx#2 종료
위와 같이 핵심은 Tx 종료
이후에 Distributed Lock 반납
이 이루어져야 한다는 것이다.
Spring Transaction
분리다음 방식은 해결 방안1
과 비슷한 방식이지만 @TransactionalEventListener
를 통한 Transaction
분리가 아닌 @Transactional
의 propagation = Propagation.REQUIRES_NEW
를 활용하여 분리하는 방식이다.
@Transactional
public Item test1() {
RLock lock = redissonClient.getLock("TEST");
try {
if (lock.tryLock(5000, 3000, TimeUnit.MILLISECONDS)) {
return testService.test();
} else {
throw new RuntimeException();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
@Service
@RequiredArgsConstructor
public class TestService {
private final ItemRepository itemRepository;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public Item test(){
Item item = itemRepository.findById("TEST").get();
item.setCount(item.getCount() - 1);
itemRepository.save(item);
return item;
}
}
위와 같이 로직을 구현하고 동일하게 JMeter
를 활용하여 100번을 실행하며 동시성
문제없이 정상적으로 수행된다.
다음은 해당 로직의 흐름과 이에 대한 실행 로그이다.
해당 방식 결과의 흐름은 다음과 같다.
Tx#1 시작
->Distributed Lock 획득
->Tx#2 시작
->로직
->Tx#2 종료
->Distributed Lock 반납
->Tx#1 종료
해결 방안1
과 해결 방안2
의 예제는 아주 간단한 Transaction
로직을 예시로 구성한 경우이다.
예제의 두 방안의 결과는 같지만 실제 업무에 두 방안을 적용할 때에는 해당 업무의 로직에 따라 Transaction
을 고려하여 적용하여야 한다.
해결 방안1
의 경우에는 기존 Transaction
에서 ApplicationEvent
를 이용하는 방식으로 Distributed Lock 획득
이후의 Transaction
을 기존과 동일한 Transaction
으로 포함 실킬 수 있다.
해결 방안2
의 경우에는 반드시 Distributed Lock 획득
이후의 로직이 항상 새로운 Transaction
에서 실행되므로 Distributed Lock 획득
이전과 Distributed Lock 획득
이후에 로직이 있는경우 해당 사항을 고려해야 한다.
@Transactional
public Item test1() {
RLock lock = redissonClient.getLock("TEST");
try {
if (lock.tryLock(5000, 3000, TimeUnit.MILLISECONDS)) {
Item item = itemRepository.findById("TEST").get();
item.setCount(item.getCount() - 1);
itemRepository.save(item);
return testService.test();
} else {
throw new RuntimeException();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
applicationEventPublisher.publishEvent(lock);
}
}
Item item = itemRepository.findById("TEST").get();
log.info("Count : {}", item.getCount());
}
@Service
@RequiredArgsConstructor
public class TestService {
private final ItemRepository itemRepository;
@Transactional
public Item test(){
Item item = itemRepository.findById("TEST").get();
item.setCount(item.getCount() - 1);
itemRepository.save(item);
return item;
}
}
INFO 23282 --- [nio-8080-exec-7] i.v.youmakemesmile.lock.TestController : Count : -100
초기 Count의 값은 100
을 설정하였으며 -2
를 수행하는 로직을 100번 실행한 결과이다.
마지막 로그의 Count
가 -100
를 출력한 것을 확인할 수 있다. 즉 test1()
와 test()
메소드가 모두 같은 Transaction
에 포함되어 실행 된다는것을 알 수 있다.
@Transactional
public Item test1() {
RLock lock = redissonClient.getLock("TEST");
try {
if (lock.tryLock(5000, 3000, TimeUnit.MILLISECONDS)) {
Item item = itemRepository.findById("TEST").get();
item.setCount(item.getCount() - 1);
itemRepository.save(item);
testService.test();
} else {
throw new RuntimeException();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
Item item = itemRepository.findById("TEST").get();
log.info("Count : {}", item.getCount());
return item;
}
@Service
@RequiredArgsConstructor
public class TestService {
private final ItemRepository itemRepository;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public Item test(){
Item item = itemRepository.findById("TEST").get();
item.setCount(item.getCount() - 1);
itemRepository.save(item);
return item;
}
}
INFO 23426 --- [io-8080-exec-14] i.v.youmakemesmile.lock.TestController : Count : 0
해결 방안1
과 같이 초기 값 100
에서 로직을 100번 실행한 결과이다.
마지막 로그의 Count
가 0를 출력한 것을 확인할 수 있다. 사실 당연한 결과로 Distributed Lock
은 해당 부분을 하나의 Thread
에서만 실행 가능하도록 Lock
를 잡아주는 것이지 Transaction
과는 연관이 없다.
이처럼 Distributed Lock
을 적용 할 경우 해당 로직의 Transaction
를 고려하여 방식을 적용하고 로직을 구현해야 할 것이다.
Annotation
& Aop
적용위의 해결 방안1
과 해결 방안2
은 Distributed Lock
획득과 반납으로 인한 BolilerPlate
코드가 존재하게 된다.
이러한 로직을 Annotation
과 Aop
를 활용하여 정리가능하다.
Annotation
Annotation
에는 Distributed Lock
획득을 위한 key
, waitTime
, leaseTime
, timeUnit
를 정의한다.
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedLock {
String key();
long waitTime() default 10000L;
long leaseTime() default 3000L;
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
Aspect
의 @Around
에서 @RedLock
을 PointCut
으로 설정하여 해당 JoinPoint
가 실행하기 이전에 tryLock
를 통해 Lock
획득을 시도하며 이후에 finally
에서 ApplicationEvent
를 발행하는 로직이다.
@Aspect
@Component
@RequiredArgsConstructor
public class RedLockAop {
private final RedissonClient redissonClient;
private final ApplicationEventPublisher applicationEventPublisher;
@Around("@annotation(io.velog.youmakemesmile.lock.RedLock)")
public Object lock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
RedLock redLock = signature.getMethod().getAnnotation(RedLock.class);
RLock lock = redissonClient.getLock(getDynamicValue(signature.getParameterNames(), proceedingJoinPoint.getArgs(), redLock.key()).toString());
try {
if (lock.tryLock(redLock.waitTime(), redLock.leaseTime(), redLock.timeUnit())) {
return proceedingJoinPoint.proceed();
} else {
throw new RuntimeException();
}
} catch (Exception e) {
throw e;
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
applicationEventPublisher.publishEvent(lock);
}
}
}
public static Object getDynamicValue(String[] parameterNames, Object[] args, String key) {
ExpressionParser parser = new SpelExpressionParser();
StandardEvaluationContext context = new StandardEvaluationContext();
for (int i = 0; i < parameterNames.length; i++) {
context.setVariable(parameterNames[i], args[i]);
}
return parser.parseExpression(key).getValue(context, Object.class);
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void test3(RLock rLock) {
rLock.unlock();
}
}
@Transactional
@RedLock(key = "'TEST'")
public Item test1() {
Item item = itemRepository.findById("TEST").get();
item.setCount(item.getCount() - 1);
return itemRepository.save(item);
}
해결 방안1
의 로직과 거의 일치하며 다른 부분은 Transaction
분리를 위해서 Service
를 만들어 해당 메소드에 Propagation.REQUIRES_NEW
를 정의하고 해당 메소드를 호출해야 한다는 점이다.
@Aspect
@Component
@RequiredArgsConstructor
public class RedLockAop {
private final RedissonClient redissonClient;
private final RequiresNewService requiresNewService;
private final ApplicationEventPublisher applicationEventPublisher;
@Around("@annotation(io.velog.youmakemesmile.lock.RedLock)")
public Object lock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
RedLock redLock = signature.getMethod().getAnnotation(RedLock.class);
RLock lock = redissonClient.getLock(getDynamicValue(signature.getParameterNames(), proceedingJoinPoint.getArgs(), redLock.key()).toString());
try {
if (lock.tryLock(redLock.waitTime(), redLock.leaseTime(), redLock.timeUnit())) {
return requiresNewService.proceed(proceedingJoinPoint);
} else {
throw new RuntimeException();
}
} catch (Exception e) {
throw e;
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
public static Object getDynamicValue(String[] parameterNames, Object[] args, String key) {
ExpressionParser parser = new SpelExpressionParser();
StandardEvaluationContext context = new StandardEvaluationContext();
for (int i = 0; i < parameterNames.length; i++) {
context.setVariable(parameterNames[i], args[i]);
}
return parser.parseExpression(key).getValue(context, Object.class);
}
}
@Service
public class RequiresNewService {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public Object proceed(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return proceedingJoinPoint.proceed();
}
}
해결 방안1
와 해결 방안2
를 테스트하다 요청 수를 증가시키면 다음과 같은 Time Out
이 발생하는 경우가 존재하였다.
HikariPool-1 - Connection is not available, request timed out after 30008ms.
발생 이유는 Hikari Conntection Pool Size
와 Tomcat Max Worker Thread
의 수에 따른 Hikari DeadLock
이 발생한 경우였다.
위와 같은 순서에 의해서 Hikari DeadLock
상태가 발생하고 Distributed Lock
획득 대기를 하고 있는 Thread
의 Connection
들과 Distributed Lock
은 획득 했으나 새로운 Connection
획득을 대기하는 Connection
들에서 TimeOut
이 발생하는 것이다.
해당 문제의 해결 방안은 정답이 있을 수는 없다고 생각한다. 지금같은 경우는 조건이 모든 요청이 같은 요청이라는 경우였다.
하지만 실제 업무에서는 다른 요청도 동시에 발생할것이라 다른 로직에서 몇개의 Connection
를 사용할지는 모른다.
위와 같은 경우라면 Hikari Database Connection Pool
의 개수를 Worker Thread Pool
의 개수보다 여유롭게 설정하면 해결된다.
그런데 그렇다고 Hikari Connection
의 개수를 증가 시킨다면 해당 로직에서는 Connection
만 획득하고 Distributed Lock
획득 대기를 하는 Thread
만 증가하여 의미 없는 Connection
만 증가하고 Database
의 부하만 발생한다.
결국 해당 어플리케이션의 특성을 고려하여 적절히 Worker Thread Pool
의 개수와 Hikari Connection Pool
의 개수를 조절하여 사용해야 한다.