재고시스템으로 알아보는 동시성 이슈 해결방법을 듣고 정리한 내용입니다.
@Test
public void 동시에_50명() throws InterruptedException {
// 처음 Quantity를 100으로 등록함.
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
int threadCount = 100;
// 비동기로 실행
ExecutorService executorService = Executors.newFixedThreadPool(32);
// 요청이 끝날 때까지 기다리는 용도
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
stockService.decrease(1L, 1L);
} finally {
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
// threadCount가 100이므로 100번 decrease하면 수량은 0이 되어야한다.
assertEquals(0, stock.getQuantity());
}
decrease 메서드는 단순히 아래처럼 구현되어있다.
public void decrease(Long quantity) {
if (this.quantity - quantity < 0) {
throw new RuntimeException("foo");
}
this.quantity -= quantity;
}
결과는? 실패한다.
왜냐하면 Race Condition이 일어났기 때문이다.
첫번째 스레드가 데이터를 가져가서 갱신하고, 그 다음 두번째 스레드가 데이터를 가져가서 갱신하는 것이 아닌,
첫번째 스레드가 데이터를 가져가서 갱신하는 동안 두번째 스레드가 공유 데이터(quantity)에 접근해 갱신하기 때문에, 문제가 발생하는 것이다.
예를 들어, quatity가 예제 코드처럼 100이었다면 Thread#1이 가져가서 99로 바꾸는 와중 (아직까지 quantity는 100인 상태) Thread#2도 그에 접근해 decrease를 수행한다면 순차적으로 진행되었을 땐 98이 되어야할 값이 둘 다 100 -> 99로 바꾸는 것을 수행하기 때문에 문제가 발생하는 것!
@Transactional(propagation = Propagation.REQUIRES_NEW)
public synchronized void decrease(Long id, Long quantity) {
Stock stock = stockRepository.findById(id).orElseThrow();
stock.decrease(quantity);
stockRepository.saveAndFlush(stock);
}
가장 먼저 decrease 메서드에 synchronized 키워드를 붙여보자. synchronized는 하나의 스레드만 해당 메서드에 접근할 수 있도록 해 준다. 다만 문제는 해결되지 않는다.
Why? 스프링에서는 @Transactional을 사용하게 되면 우리가 만든 클래스를 래핑해서 실행하게 된다. (Proxy로 감싸서 AOP가 동작할 수 있도록 한다.) 즉, 해당 메서드의 시작과 끝에 TX가 실행되고 종료될 수 있도록 한다.
실제 데이터베이스에 업데이트하기 전에 proxy에서 decrease에 접근할 수 있기 때문에 (갱신되기 이전 값에 접근 가능하여) 문제가 발생하는 것.
만약 @Transactional 애노테이션을 제거하고 테스트케이스를 실행한다면 정상적으로 동작할 것이다.
그럼 Synchronized를 사용했을 때 문제는 없을까?
하나의 프로세스 안에서만 보장되는 키워드이기 때문에 서버가 한대일 땐 괜찮지만, 서버가 여러 대일 경우에는 데이터에 대한 접근을 여러 대에서 할 수 있게 된다.
Lock을 걸어서 정합성을 맞추는 방법. exclusive lock을 걸게되며 다른 트랜잭션에서는 lock이 해제되기전에 데이터를 가져갈 수 없다. 단, 데드락이 발생할 수 있다.@Lock(value = LockModeType.PESSIMISTIC_WRITE)
@Query("select s from Stock s where s.id=:id")
Stock findByIdWithPessimisticLock(Long id);
@Lock(value = LockModeType.OPTIMISTIC)
@Query("select s from Stock s where s.id = :id")
Stock findByIdWithOptimisticLock(Long id);`
Stock 엔티티에는 version을 명시해주어야한다.
@Version
private Long version;
별도의 락을 잡지않아 Pessimistic Lock보다 성능이 좋지만, 업데이트에 실패한 경우에 대한 처리를 개발자가 직접 해줘야함. 충돌이 빈번히 일어나지 않는 경우에 적합하다.
public interface LockRepository extends JpaRepository<Stock, Long> {
@Query(value = "select get_lock(:key, 3000)", nativeQuery = true)
void getLock(String key);
@Query(value = "select release_lock(:key)", nativeQuery = true)
void releaseLock(String key);
}
락을 사용하는 경우 실무에서는 데이터 소스를 분리하는 것이 낫다. 커넥션 풀이 부족해질 수 있기 때문.
위의 LockRepository를 decrease의 호출 전과 후에 getLock, releaseLock을 해줌으로써 사용할 수 있다.
public Boolean lock(Long key) {
return redisTemplate
.opsForValue()
.setIfAbsent(generateKey(key), "lock", Duration.ofMillis(3_000));
}
public Boolean unlock(Long key) {
return redisTemplate.delete(generateKey(key));
}
private String generateKey(Long key) {
return key.toString();
}
구현이 간단하지만 spin-lock 방식이므로 레디스에 부하를 줄 수 있어서, sleep을 활용해 락 획득 - 재시도 간에 텀을 두어야한다.
재시도가 필요하지 않은 lock은 lettuce를 활용하는 것이 나을 수 있다.
public void decrease(Long key, Long quantity) {
RLock lock = redissonClient.getLock(key.toString());
try {
boolean available = lock.tryLock(10, 1, TimeUnit.SECONDS);
if (!available) {
System.out.println("lock 획득 실패");
return;
}
stockService.decrease(key, quantity);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
Redisson을 활용하려면 별도의 라이브러리를 활용해줘야한다.
재시도가 필요한 경우에는 redisson을 활용하는 것이 나을 수 있다.