JPA 락킹 포스트 에서 JPA 가 DB 에 락을 거는 방법에 대해서 학습을 했었다. 해당 포스트는 JPA 락킹 포스트 에서 공부했었던 내용과 거기에 추가적인 내용을 더해 예제를 만들어보며 락킹을 공부했던 내용을 기록하고자 작성하게 됐다.
이전 포스트 작성한 바와 같이 동시성을 처리하기 위해서 다양한 방법으로 락을 걸 수 있다. 이전 포스트에서는 비관적 락, 낙관적 락만 다뤘지만 이 포스트에는 분산락(NamedLock)을 추가해서 예제를 작성했다.
다만 낙관적 락의 경우, APP 레벨에서 락킹이 일어나고 구현이 간단하기 때문에 낙관적 락과 비관적 락 중 비관적 락만 테스트를 진행했다.
@Entity
@Getter
@NoArgsConstructor
public class Stock {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long productId;
private Long qty;
public Stock(Long productId, Long qty) {
this.productId = productId;
this.qty = qty;
}
public void decrease(Long qty) {
if(this.qty - qty < 0 ){
throw new RuntimeException();
}
this.qty -= qty;
}
}
@Service
@RequiredArgsConstructor
public class StockService {
private final StockRepository stockRepository;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void decrease(Long id, Long qty) {
Stock stock = stockRepository.findById(id).get();
stock.decrease(qty);
}
}
public interface StockRepository extends JpaRepository<Stock, Long> {
}
비관적 락은 @Lock
을 사용해서 구현할 수 있다.
public interface StockRepository extends JpaRepository<Stock, Long>{
@Lock(value = LockModeType.PESSIMISTIC_WRITE)
@Query("select s from Stock s where s.id = :id")
Stock findByIdWithPessimisticLock(@Param("id") Long id);
}
@Lock
과 LockModeType
을 사용하면 DB에 락을 걸 수 있다.@SpringBootTest
class StockServiceTest {
@Autowired
private StockService stockService;
@Autowired
private StockRepository stockRepository;
@BeforeEach
void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@Test
public void 재고감소_비관적락() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for(int i=0;i<threadCount;i++) {
executorService.submit(() ->{
try {
stockService.decreaseWithPessimisticLock(1L, 1L);
}finally{
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
assertEquals(0L, stock.getQty()); // success
}
}
select ... for Update
쿼리가 발생하면서 DB에 락킹을 하게 된다. 분산 락이라는 것은 분산 서버 환경에서 공통된 저장소(DB)를 사용해서 자원이 사용 중인지 체크/획득/반납하는 형태로 분산된 서버들 간의 동기화 처리를 지원하는 락킹을 말한다.
MySQL 에서 User Level Lock을 활용하면 분산락을 구현할 수 있다. 여기서 User Level Lock 이라는 것은, 사용자가 특정 문자열에 Lock을 걸 수 있는 Lock을 의미한다. 관련 메서드는 아래와 같다.
여기서 의문점이 생겼었다. Pessimistic Lock 의 경우에도 record에 락을 걸기 때문에 분산 환경에서도 충분히 사용할 수 있다고 생각했기 때문이었다. 이와 관련해서 구글링을 해본 결과 MySQL을 이용한 분산락으로 여러 서버에 걸친 동시성 관리 의 댓글에 있는 답변에서 원하는 답변을 찾을 수 있었다.
pessimistic lock을 사용할 경우, timeout을 설정하는 것이 까다롭기 때문에 timeout 설정이 간단한 named lock을 사용하는게 나은 경우도 있다는 답변이었다.+ 2022.12.29
추가로, NamedLock을 사용하면 복잡한 로직을 처리할 때 로직을 하나의 트랜잭션으로 묶을 수 있다는 장점이 있다.
public interface LockRepository extends JpaRepository<Stock, Long>{
@Query(value = "select get_lock(:key, 3000)", nativeQuery = true)
void getLock(@Param("key") String key);
@Query(value = "select release_lock(:key)", nativeQuery = true)
void releaseLock(@Param("key") String key);
}
@Component
@RequiredArgsConstructor
public class NamedLockFacade {
private final LockRepository lockRepository;
private final StockService stockService;
@Transactional
public void decreaseWithNamedLock(Long id, Long qty) {
try {
lockRepository.getLock(id.toString());
stockService.decrease(id, qty);
}finally {
lockRepository.releaseLock(id.toString());
}
}
}
@SpringBootTest
class StockServiceTest {
@Autowired
private StockService stockService;
@Autowired
private StockRepository stockRepository;
@Autowired
private NamedLockFacade namedLockFacade;
@BeforeEach
void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@Test
public void 재고감소_네임드락() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for(int i=0;i<threadCount;i++) {
executorService.submit(() ->{
try {
namedLockFacade.decreaseWithNamedLock(1L, 1L);
}finally{
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
assertEquals(0L, stock.getQty());
}
Redis 로 Named Lock을 구현하는 방법에는 크게 두 가지가 있다. Lettuce
와 Redisson
이다.
Lettuce
는 Redis에 Key를 넣어서 접근하는 스레드들이 이를 Mutex 처럼 활용하는 방식이다. 그렇기 때문에 spin lock이 발생한다는 특징이 있다. spin lock 방식으로 대기하기 때문에 retry 로직을 개발자가 직접 작성해야 한다.
Redisson
은 Redis의 Pub/Sub을 활용한 방식이다. 스레드가 Key를 반납할 때 channel 에 메세지를 pub 하고, 다른 스레드가 이 메세지를 sub 하는 방식으로 진행된다.
그렇기 떄문에 Lettuce
와 같이 spin lock 이 발생하지 않아 Redis에 부하가 적고, retry 를 기본으로 제공하며 lock 을 라이브러리 차원에서 지원한다. (redisson의 lock 사용법을 따로 공부해야 한다는 단점도 있다.)
결과적으로 재시도가 필요한 경우라면 Redisson
, 재시도가 필요하지 않은 경우라면 Lettuce
를 추천한다고 한다.
@Configuration
public class RedisConfig {
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory("127.0.0.1",6379);
}
@Bean
public RedisTemplate<String, String> redisTemplate(){
RedisTemplate<String, String> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory());
return template;
}
}
@Component
@RequiredArgsConstructor
public class LettuceLockRepository {
private final RedisTemplate<String, String> redisTemplate;
public Boolean lock(Long key) {
return redisTemplate
.opsForValue()
.setIfAbsent(generateKey(key), "lock", Duration.ofMillis(3000));
}
public Boolean unlock(Long key) {
return redisTemplate.delete(generateKey(key));
}
private String generateKey(Long key) {
return key.toString();
}
}
@Component
@RequiredArgsConstructor
public class LettuceLockFacade {
private final LettuceLockRepository redisLockRepository;
private final StockService stockService;
public void decrease(Long key, Long qty) throws InterruptedException {
while(!redisLockRepository.lock(key)) {
Thread.sleep(100);
}
try {
stockService.decrease(key, qty);
}finally {
redisLockRepository.unlock(key);
}
}
}
@SpringBootTest
class StockServiceTest {
@Autowired
private StockRepository stockRepository;
@Autowired
private LettuceLockFacade lettuceLockFacade;
@BeforeEach
void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@Test
public void 재고감소_레디스_lettuce() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for(int i=0;i<threadCount;i++) {
executorService.submit(() ->{
try {
lettuceLockFacade.decrease(1L, 1L);
} catch (InterruptedException e) {
throw new RuntimeException();
}finally{
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
assertEquals(0L, stock.getQty());
}
}
Redisson은 구현이 매우 간단하다.
implementation 'org.redisson:redisson-spring-boot-starter:3.17.4'
@Component
@RequiredArgsConstructor
public class RedissonLockFacade {
private final RedissonClient redissonClient;
private final StockService stockService;
public void decrease(Long key, Long qty) {
RLock lock = redissonClient.getLock(key.toString());
try {
boolean available = lock.tryLock(10, 1, TimeUnit.SECONDS);
if(!available) {
System.out.println("lock 획득 실패");
return;
}
stockService.decrease(key, qty);
}catch (Exception e) {
throw new RuntimeException();
}finally {
lock.unlock();
}
}
}
RedissonClient
를 사용하면 lock을 획득하고 반납하는 작업을 손쉽게 구현할 수 있다.@SpringBootTest
class StockServiceTest {
@Autowired
private StockRepository stockRepository;
@Autowired
private LettuceLockFacade lettuceLockFacade;
@BeforeEach
void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@Test
public void 재고감소_레디스_redisson() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for(int i=0;i<threadCount;i++) {
executorService.submit(() ->{
try {
redissonLockFacade.decrease(1L, 1L);
}finally{
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
assertEquals(0L, stock.getQty());
}
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedissonLock {
String key();
TimeUnit timeUnit() default TimeUnit.SECONDS;
// 락 대기 시간
long waitTime() default 5L;
// 락 해제 시간
long leaseTime() default 2L;
}
@Aspect
@Component
@RequiredArgsConstructor
public class RedissonLockAspect {
private final RedissonClient redissonClient;
private final RedissonTransactionCall redissonTransactionCall;
@Around("@annotation(ex.concurrency.aop.RedissonLock)")
public Object lock(ProceedingJoinPoint joinPoint) throws Throwable{
RedissonLock redissonLock = ((MethodSignature)joinPoint.getSignature())
.getMethod().getAnnotation(RedissonLock.class);
RLock lock = redissonClient.getLock(redissonLock.key());
try {
boolean available = lock.tryLock(redissonLock.waitTime(),
redissonLock.leaseTime(),
redissonLock.timeUnit());
if(!available) {
System.out.println("lock 획득 실패");
return false;
}
return redissonTransactionCall.proceed(joinPoint);
}catch (Exception e) {
throw new RuntimeException();
}finally {
lock.unlock();
}
}
}
@Component
public class RedissonTransactionCall {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public Object proceed(ProceedingJoinPoint joinPoint) throws Throwable{
return joinPoint.proceed();
}
}
RedissonTransactionCall
를 만들어서 @Transactional(propagation = Propagation.REQUIRES_NEW)
를 선언해줬다.@Service
@RequiredArgsConstructor
public class StockService {
private final StockRepository stockRepository;
@RedissonLock(key = "#{args[0]}")
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void decrease(Long id, Long qty) {
Stock stock = stockRepository.findById(id).get();
stock.decrease(qty);
}
}
@SpringBootTest
class StockServiceTest {
@Autowired
private StockRepository stockRepository;
@Autowired
private LettuceLockFacade lettuceLockFacade;
@BeforeEach
void before() {
Stock stock = new Stock(1L, 100L);
stockRepository.saveAndFlush(stock);
}
@Test
public void 재고감소_레디스_redisson_aop() throws InterruptedException {
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
for(int i=0;i<threadCount;i++) {
executorService.submit(() ->{
try {
stockService.decrease(1L, 1L);
}finally{
latch.countDown();
}
});
}
latch.await();
Stock stock = stockRepository.findById(1L).orElseThrow();
assertEquals(0L, stock.getQty());
}
}