동시성 문제란, 동일한 하나의 데이터에 2 이상의 스레드, 혹은 세션에서 가변 데이터를 동시에 제어할 때 나타는 문제로,
하나의 세션이 데이터를 수정 중일때, 다른 세션에서 수정 전의 데이터를 조회해 로직을 처리함으로써 데이터의 정합성이 깨지는 문제를 말합니다.
아래와 같이 재고 시스템을 만들어서 동시성 문제를 제어해보겠다.
@Entity
@Getter
@NoArgsConstructor
public class Stock {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long productId;
private Long quantity;
public Stock(Long productId, Long quantity) {
this.productId = productId;
this.quantity = quantity;
}
public void decrease(Long quantity) {
if (this.quantity - quantity < 0) {
throw new RuntimeException("재고 부족");
}
this.quantity = this.quantity - quantity;
}
}
public interface StockRepository extends JpaRepository<Stock, Long> {
}
@Service
@RequiredArgsConstructor
public class StockService {
private final StockRepository stockRepository;
/**
* 재고 감소
*/
@Transactional
public void decrease(final Long id, final Long quantity) {
Stock stock = stockRepository.findById(id).orElseThrow();
stock.decrease(quantity);
stockRepository.saveAndFlush(stock);
}
}
일반적인 재고 감소 로직이다.
@SpringBootTest
class StockServiceTest {
@Autowired
private StockService stockService;
@Autowired
private StockRepository stockRepository;
@BeforeEach
public void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@AfterEach
public void after() {
stockRepository.deleteAll();
}
@Test
public void stock_decrease() {
stockService.decrease(1L, 1L);
Stock stock = stockRepository.findById(1L).orElseThrow();
assertThat(stock.getQuantity()).isEqualTo(99L);
}
@Test
@DisplayName("동시에_100개의_요청")
public void requests_100_AtTheSameTime() throws InterruptedException {
int threadCount = 100;
//멀티스레드 이용 ExecutorService : 비동기를 단순하게 처리할 수 있또록 해주는 java api
ExecutorService executorService = Executors.newFixedThreadPool(32);
//다른 스레드에서 수행이 완료될 때 까지 대기할 수 있도록 도와주는 API - 요청이 끝날때 까지 기다림
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
stockService.decrease(1L, 1L);
}
finally {
latch.countDown();
}
});
}
latch.await();//다른 쓰레드에서 수행중인 작업이 완료될때까지 기다려줌
Stock stock = stockRepository.findById(1L).orElseThrow();
//100 - (1*100) = 0
//race condition 이 발생함 동시에 변경하려고 할때 발생하는 문제
//하나의 쓰레드의 작업이 완료되기 이전에 쓰레드가 공유 자원에 접근하였기 떄문에 값이 공유 자원의 값이 다르다.
assertThat(stock.getQuantity()).isEqualTo(0L);
}
}
<코드 설명>
그러나 테스트는 실패한다.
<실패 원인>
<예상 작업 순서>
<본 테스트에서의 작업 순서>
Race Condition을 해결하기 위해서는 하나의 스레드가 작업을 완료한 후에, 다른 스레드가 공유된 자원에 접근 가능하도록 해야 한다.
따라서, 이를 해결하기 위해 3가지 해결방법을 적용해본다
1. Java sychronized 동기화
2. DB Lock
Synchronized?
따라서 synchronized는, 현제 데이터를 사용하고 있는 해당 스레드를 제외하고 나머지 스레드들은 데이터 접근을 막아 순차적으로 데이터에 접근할 수 있도록 해준다.
코드는 아래와 같이 수정한다.
@Service
@RequiredArgsConstructor
public class SynchronizedStockService {
private final StockRepository stockRepository;
/**
* 재고 감소
*/
public synchronized void decrease(final Long id, final Long quantity) {
Stock stock = stockRepository.findById(id).orElseThrow();
stock.decrease(quantity);
stockRepository.saveAndFlush(stock);
}
}
@SpringBootTest
class SynchronizedStockServiceTest{
@Autowired
private SynchronizedStockService synchronizedStockService;
@Autowired
private StockRepository stockRepository;
@BeforeEach
public void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@AfterEach
public void after() {
stockRepository.deleteAll();
}
@Test
public void stock_decrease() {
synchronizedStockService.decrease(1L, 1L);
Stock stock = stockRepository.findById(1L).orElseThrow();
assertThat(stock.getQuantity()).isEqualTo(99L);
}
@Test
@DisplayName("동시에_100개의_요청")
public void requests_100_AtTheSameTime() throws InterruptedException {
int threadCount = 100;
//멀티스레드 이용 ExecutorService : 비동기를 단순하게 처리할 수 있또록 해주는 java api
ExecutorService executorService = Executors.newFixedThreadPool(32);
//다른 스레드에서 수행이 완료될 때 까지 대기할 수 있도록 도와주는 API - 요청이 끝날때 까지 기다림
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
synchronizedStockService.decrease(1L, 1L);
}
finally {
latch.countDown();
}
});
}
latch.await();//다른 쓰레드에서 수행중인 작업이 완료될때까지 기다려줌
Stock stock = stockRepository.findById(1L).orElseThrow();
assertThat(stock.getQuantity()).isEqualTo(0L);
}
}
기존 코드에서 @Transactional 제거와 synchornized 키워드만을 수정해주면, 테스트 코드는 그대로 두어도 정상 수행이 된다,
Pessimistic Lock(비관적 락) 이란?
따라서 Pessimistic Lock을 사용하면
위와 같이 정합성이 맞춰진다.
@Service
@RequiredArgsConstructor
public class PessimisticLockStockService {
private final StockRepository stockRepository;
@Transactional
public void decrease(final Long id, final Long quantity) {
Stock stock = stockRepository.findByWithPessimisticLock(id);
stock.decrease(quantity);
stockRepository.saveAndFlush(stock);
}
}
public interface StockRepository extends JpaRepository<Stock, Long> {
@Lock(value = LockModeType.PESSIMISTIC_WRITE)
@Query("select s from Stock s where s.id = :id")
Stock findByWithPessimisticLock(final Long id);
}
@Service
@RequiredArgsConstructor
public class StockService {
private final StockRepository stockRepository;
@Transactional
public void decrease(final Long id, final Long quantity) {
Stock stock = stockRepository.findByWithPessimisticLock(id);
stock.decrease(quantity);
stockRepository.saveAndFlush(stock);
}
}
@SpringBootTest
class PessimisticLockStockServiceTest {
@Autowired
private PessimisticLockStockService stockService;
@Autowired
private StockRepository stockRepository;
@BeforeEach
public void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@AfterEach
public void after() {
stockRepository.deleteAll();
}
@Test
@DisplayName("Pessimistic LOCK 동시에_100개의_요청")
public void Pessimistic_requests_100_AtTheSameTime() throws InterruptedException {
int threadCount = 100;
//멀티스레드 이용 ExecutorService : 비동기를 단순하게 처리할 수 있또록 해주는 java api
ExecutorService executorService = Executors.newFixedThreadPool(32);
//다른 스레드에서 수행이 완료될 때 까지 대기할 수 있도록 도와주는 API - 요청이 끝날때 까지 기다림
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
stockService.decrease(1L, 1L);
}
finally {
latch.countDown();
}
}
);
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
//100 - (1*100) = 0
assertThat(stock.getQuantity()).isEqualTo(0L);
}
}
위와 같이 성공하는 모습을 볼 수 있다.
장점
단점
Optimistic Lock(낙관적 락) 이란?
<Optimistic Lock 과정>
1) 서버 1이 version1 임을 조건절에 명시하면서 업데이트 쿼리를 날림
2) version1 쿼리가 업데이트 되어서 디비는 version2가 됨
3) 서버 2가 version1로 업데이트 쿼리를 날리면 버전이 맞지 않아 실패
4) 쿼리가 실패하면 서버2에서 다시 조회하여 버전을 맞춘 후 업데이트 쿼리를 날리는 과정을 거침
1~2 과정
3~4 과정
<Optimistic Lock 점유 과정>
코드에 Optimistic Lock을 적용해보면
@Entity
@Getter
@NoArgsConstructor
public class Stock {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long productId;
private Long quantity;
@Version
private Long version;
public Stock(Long productId, Long quantity) {
this.productId = productId;
this.quantity = quantity;
}
public void decrease(Long quantity) {
if (this.quantity - quantity < 0) {
throw new RuntimeException("재고 부족");
}
this.quantity = this.quantity - quantity;
}
}
public interface StockRepository extends JpaRepository<Stock, Long> {
@Lock(value = LockModeType.PESSIMISTIC_WRITE)
@Query("select s from Stock s where s.id = :id")
Stock findByWithPessimisticLock(Long id);
@Lock(value = LockModeType.OPTIMISTIC)
@Query("select s from Stock s where s.id = :id")
Stock findByWithOptimisticLock(Long id);
}
@Service
@RequiredArgsConstructor
public class OptimisticLockStockService {
private final StockRepository stockRepository;
@Transactional
public void decrease(Long id, Long quantity) {
Stock stock = stockRepository.findByWithOptimisticLock(id);
stock.decrease(quantity);
stockRepository.saveAndFlush(stock);
}
}
@Service
@RequiredArgsConstructor
public class OptimisticLockStockFacade {
private final OptimisticLockStockService optimisticLockStockService;
public void decrease(Long id, Long quantity) throws InterruptedException {
while (true) {
try {
optimisticLockStockService.decrease(id, quantity);
break;
} catch (Exception e) {
Thread.sleep(50);
}
}
}
}
해당 클래스는 여러 스레드가 동시에 접근 시 version 정보가 맞지 않아 exception 이 발생할 경우 재 요청을 하기 위해 존재하는 서비스 클래스이다.
즉, Lock을 어플리케이션 단계에서 해결한다는 의미이다.
기존과 동일하게 테스트 코드를 작성하고 진행시켜보면
@DisplayName("낙관적락 재고 선점 테스트")
@SpringBootTest
public class OptimisticLockStockFacadeTest {
@Autowired
private OptimisticLockStockFacade optimisticLockStockFacade;
@Autowired
private StockRepository stockRepository;
@BeforeEach
public void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@AfterEach
public void after() {
stockRepository.deleteAll();
}
@Test
void OptimisticLock_동시에_100개의_요청() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);//비동기로 실행하는 작업을 단순화하여 사용
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
optimisticLockStockFacade.decrease(1L, 1L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();;
}
});
}
latch.await();//다른 쓰레드에서 수행중인 작업이 완료될때까지 기다려줌
Stock stock = stockRepository.findById(1L).orElseThrow();
assertEquals(0L, stock.getQuantity());
}
}
위와 같이 버전 정보를 포함해서 쿼리가 나가고 정상 작동한다.
Named Lock 이란?
📌 Named Lock은 Passimistic Lock 과 유사하지만, Passimistic Lock 은 row 나 table 단위로 락을 걸지만, Named Lock 은 metadata 단위로 락을 건다는 차이점이 존재
<Named Lock 점유 과정>
1. Named Lock은 Stock에 락을 걸지 않고, 별도의 공간에 락을 건다
2. session-1 이 1이라는 이름으로 락을 건다면, session 1 이 1을 해지한 후에 락을 얻을 수 있습니다.
public interface LockRepository extends JpaRepository<Stock, Long> {
@Query(value = "select get_lock(:key, 3000)", nativeQuery = true)
void getLock(String key);
@Query(value = "select release_lock(:key)", nativeQuery = true)
void releaseLock(String key);
}
예제에서는 편의성을 위해서 Stock 엔티티를 사용하지만, 실무에서는 별도의 JDBC 를 사용해야 한다.
NamedLockFacade
@Component
@RequiredArgsConstructor
public class NamedLockFacade {
private final LockRepository lockRepository;
private final NamedLockStockService stockService;
@Transactional
public void decrease(final Long id, final Long quantity) {
try {
lockRepository.getLock(id.toString());
stockService.decrease(id, quantity);
}finally {
//락의 해제
lockRepository.releaseLock(id.toString());
}
}
}
실제 로직 실행 전 후로 getLock 과 releaseLock을 수행해야 하기 때문에 Facade 클래스를 생성한다.
lockRepository.getLock(id.toString());을 통해 해당 id의 Named Lock을 획득한다.
lockRepository.releaseLock(id.toString());을 통해 락을 해제한다.
@Service
@RequiredArgsConstructor
public class NamedLockStockService {
private final StockRepository stockRepository;
/**
* 재고 감소
*/
//부모의 트랜잭션과 별도로 실행되기 위해서 propagation을 수정함
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void decrease(Long id, Long quantity) {
Stock stock = stockRepository.findById(id).orElseThrow();
stock.decrease(quantity);
stockRepository.saveAndFlush(stock);
}
}
spring:
jpa:
hibernate:
ddl-auto: create
show-sql: true
properties:
hibernate:
# show_sql: true
# format_sql: true
dialect: org.hibernate.dialect.MySQL8Dialect
datasource:
# url: jdbc:h2:tcp://localhost/~/test
# username: sa
# password:
# driver-class-name: org.h2.Driver
url: jdbc:mysql://localhost:3306/concurrent?useSSL=false&useUnicode=true&serverTimezone=Asia/Seoul
username: root
password: ch122411
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 40
DataSource 를 사용해주어야하기 때문에 커넥션 풀 수를 늘려주어야 한다
이제 테스트 코드 작성하고 실행해보겠다.
@SpringBootTest
class NamedLockStockFacadeTest {
@Autowired
private NamedLockFacade namedLockFacade;
@Autowired
private StockRepository stockRepository;
@BeforeEach
public void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@AfterEach
public void after() {
stockRepository.deleteAll();
}
@Test
public void 동시에_100개의_요청() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
namedLockFacade.decrease(1L, 1L);
System.out.println("남은 재고 : " + stockRepository.findById(1L).orElseThrow().getQuantity());
} finally {
{
latch.countDown();
}
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
// 100 - (1 * 100) = 0
assertEquals(0L, stock.getQuantity());
}
쿼리문을 보면 lock을 얻고, 해제하는 과정을 반복한다.
Lettuce 란?
<Spin Lock 과정>
1. 쓰레드1이 키가 1인 데이터를 레디스에 set하면 처음엔 1이 없으므로 성공한다.
2. 쓰레드2가 키가 1인 데이터를 set하려 하면 1이 이미 있으므로 실패한다.
3. 성공을 위해 100ms마다 재시도하는 로직을 작성해서 성공할 때 까지 시도한다.
이제 Lettuce로 락을 구현해보자
우선 Redis 의존성을 설치한다.
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
@Component
public class RedisLockRepository {
private RedisTemplate<String, String> redisTemplate;
public RedisLockRepository(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public Boolean lock(Long key) {
return redisTemplate
.opsForValue()
.setIfAbsent(generateKey(key), "lock", Duration.ofMillis(3_000));
}
public Boolean unlock(Long key) {
return redisTemplate.delete(generateKey(key));
}
public String generateKey(Long key) {
return key.toString();
}
}
@Component
public class LettuceLockStockFacade {
private RedisLockRepository redisLockRepository;
private StockService stockService;
public LettuceLockStockFacade(RedisLockRepository redisLockRepository, StockService stockService) {
this.redisLockRepository = redisLockRepository;
this.stockService = stockService;
}
public void decrease(Long key, Long quantity) throws InterruptedException {
while (!redisLockRepository.lock(key)) {
Thread.sleep(100);
}
try {
stockService.decrease(key, quantity);
} finally {
redisLockRepository.unlock(key);
}
}
}
로직 실행 전 후로 Lock 획득과 해제를 수행해야 하므로 Facade 클래스를 추가한다.
1. SpinLock 방식으로 락을 얻기를 시도하고,
2. 락을 얻은 후, 재고 감소 비지니스 로직을 처리합니다.
3. 그 후, 락을 해제해준다.
@SpringBootTest
class LettuceLockStockFacadeTest {
@Autowired
private LettuceLockStockFacade lettuceLockStockFacade;
@Autowired
private StockRepository stockRepository;
@BeforeEach
public void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@AfterEach
public void after() {
stockRepository.deleteAll();
}
@Test
public void 동시에_100개의_요청() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
lettuceLockStockFacade.decrease(1L, 1L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
{
latch.countDown();
}
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
// 100 - (1 * 100) = 0
assertEquals(0L, stock.getQuantity());
}
}
구현이 간단하다는 장점이 있지만, Spin Lock 방식이, Lock 을 얻을 때까지 Lock 얻기를 시도하기 떄문에, 계속해서 Redis 에 접근해서 Redis에 부하를 줄 수 있다는 단점이 존재한다.
Redission 이란?
<Pub-Sub 과정>
implementation 'org.redisson:redisson-spring-boot-starter:3.17.4'
redisson은 lock 관련 클래스를 제공해주기 때문에 repository는 필요없다.
하지만 lock 획득, 해제는 직접 작성해야 하므로 facade 클래스를 생성한다.
@Component
@RequiredArgsConstructor
public class RedissonLockStockFacade {
private RedissonClient redissonClient;
private StockService stockService;
public RedissonLockStockFacade(RedissonClient redissonClient, StockService stockService) {
this.redissonClient = redissonClient;
this.stockService = stockService;
}
public void decrease(Long key, Long quantity) {
RLock lock = redissonClient.getLock(key.toString());
try {
boolean available = lock.tryLock(20, 1, TimeUnit.SECONDS);
if (!available) {
System.out.println("lock 획득 실패!");
return;
}
stockService.decrease(key, quantity);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
@SpringBootTest
class RedissonLockStockFacadeTest {
@Autowired
private RedissonLockStockFacade redissonLockStockFacade;
@Autowired
private StockRepository stockRepository;
@BeforeEach
public void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@AfterEach
public void after() {
stockRepository.deleteAll();
}
@Test
public void 동시에_100개의_요청() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
redissonLockStockFacade.decrease(1L, 1L);
} finally {
{
latch.countDown();
}
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
// 100 - (1 * 100) = 0
assertEquals(0L, stock.getQuantity());
}
}
Lettuce와 Redisson의 비교
구현이 간단하다
spring data redis를 이용하면 lettuce가 기본이기 때문에 별도의 라이브러리를 사용하지 않아도 된다.
spin lock 방식이기 때문에 동시에 많은 스레드가 lock 획득 대기 상태라면 redis에 부하가 갈 수 있다.
lock 획득 재시도를 기본으로 제공한다.
pub-sub방식으로 구현되어 있기 때문에 lettuce 대비 redis에 부하가 덜 간다.
별도 라이브러리를 사용해야 한다.
lock을 라이브러리 차원에서 제공하기 때문에 사용법을 공부해야한다.
실무에서는
재시도가 필요하지 않은 lock은 lettuce활용
재시도가 필요한 경우에는 redisson 활용
Mysql과 Redis 비교
이미 Mysql을 사용하고 있다면 별도의 비용없이 사용 가능하다.
어느정도의 트래픽까지는 문제 없이 활용이 가능하다.
Redis보다 성능이 좋지 않다
활용중인 Redis가 없다면 별도의 구축비용과 인프라 관리비용이 발생한다.
Mysql보다 성능이 좋다.
참고
https://devhooney.tistory.com/110
https://thalals.tistory.com/370#google_vignette
https://everydayyy.tistory.com/167
https://velog.io/@hyojhand/named-lock-distributed-lock-with-redis(Named Lock, Redis)
https://dkswnkk.tistory.com/681