[Spring] 스프링에서 동시성 문제 해결 방법 (1) - synchronized, Lock 사용

청포도봉봉이·2024년 5월 3일
1

Spring

목록 보기
31/35
post-thumbnail

코드는 Github에서 보실 수 있습니다.

애플리케이션을 개발하면 만약에 동시에 누군가 게시글을 작성한다면 어떻게 될까? 아니면 동시에 상품을 주문하게 된다면 어떻게 될까? 등의 상상을 해봤을 겁니다.

저도 마찬가지로 재고 감소 로직을 짜던 중 동시성 문제에 대한 질문을 받았습니다.

이러한 동시성 문제를 해결하기 위해서 다른 분들도 많이 생각을 하셨을 겁니다. 그 방법에 대해 알아보겠습니다.

@SpringBootTest
class ProductServiceTest {
    @Autowired
    private ProductService productService;

    @Autowired
    private ProductRepository productRepository;

    private Long productId;

    @BeforeEach
    void setUp() {
        productId = productRepository.saveAndFlush(new Product(1L, 100))
                .getId();
    }

    @AfterEach
    void tearDown() {
        productRepository.deleteAll();
    }

    @Test
    void decreaseStockTest() {
        // given
        productService.decrease(productId);

        // when
        Product product = productRepository.getById(productId);

        // then
        assertThat(product.getQuantity()).isEqualTo(99);
    }
}

비즈니스 로직의 decrease() 메서드 통합 테스트 코드를 작성해보았습니다. 100개의 재고를 가진 상품을 생성하고 decrease()를 실행했을때 실제로 재고가 1개가 감소하는지 확인해보았습니다.

동시성 문제 발생 코드

위 코드는 동시성 문제가 발생할 수 있는 코드입니다. 만약에 요청이 100개가 동시에 온다면 어떻게 동작하는지 확인해보겠습니다.

    @Test
    void 재고를_동시에_100개_감소_시_잔여_재고_오류() throws InterruptedException {
        // given
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(30);
        CountDownLatch latch = new CountDownLatch(threadCount);

        // when
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    productService.decrease(productId);
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();

        // then
        Product product = productRepository.getById(productId);
        assertThat(product.getQuantity()).isEqualTo(0);
    }

코드에서 ExecutorService에 의해 관리되는 쓰레드 풀의 크기는 30으로 설정되어 있습니다. 이는 한 번에 최대 30개의 작업을 병렬로 실행할 수 있다는 것을 의미합니다.

ExecutorService의 풀 크기가 30이므로 이 100번의 작업은 30개의 쓰레드에서 병렬적으로 처리되며 한 번에 처리할 수 있는 최대 작업 수는 30개입니다. 남은 작업들은 쓰레드 풀에서 쓰레드가 사용 가능해질 때까지 대기 상태에 있게 됩니다.

위 테스트 코드를 실행하면 위와 같이 실패하게 됩니다. 우리는 분명 100개의 재고를 감소시켰는데 실제 남아있는 재고는 83개인 상황이 생긴 것 입니다.

Race Condition

위 문제가 발생하는 이유는 바로 Race Condition 때문입니다.
Race Condition이란 두 개 이상의 프로세스 혹은 스레드가 공유 자원을 서로 사용하려고 경합(Race)하는 현상을 의미합니다.

저희가 생각하는 동작은
1. Thread1 에서 Product를 가져와 재고를 감소시킨다. (quantity = 99)
2. Thread2 에서 Product를 가져와 재고를 감소시킨다. (quantity = 98)

이런식으로 동작한다고 생각할 것입니다.

하지만 실패한 테스트 코드에서는 아래와 같은 상황이 발생합니다.

우리의 예제를 동작으로 살펴본다면
1. Thread1이 Product 가져온다. (quantity = 100)
2. Thread2이 Product 가져온다. (quantity = 100)
3. Thread1이 재고를 감소시킨다. (quantity = 99)
4. Thread2이 재고를 감소시킨다. (quantity = 99)

이렇게 덮어 씌워지는 상황입니다.

위 문제를 해결하기 위해서 하나의 쓰레드의 작업이 완료되고 다른 쓰레드가 공유 자원에 접근할 수 있게 해줘야 합니다.

해결방법

synchronized 키워드 사용

synchronized는 자바에서 제공하는 키워드입니다. 간단하게 한 개의 쓰레드만 접근 가능하도록 설정해줍니다.

@Service
@RequiredArgsConstructor
public class ProductService {

    private final ProductRepository productRepository;

    @Transactional
    public synchronized void decrease(Long id) {
        Product product = productRepository.getById(id);
        product.decrease();
        productRepository.saveAndFlush(product);
    }
}

테스트를 다시 실행해 보겠습니다.

synchronized를 사용했는데도 왜 테스트가 실패할까요?
원인은 스프링의 @Transactional의 동작 원리에 있습니다.
@Transactional이 붙은 메소드는 다음과 같이 Proxy 객체를 생성하여 트랜잭션 관련 처리를 해줍니다.

public class ProductServiceProxy {
		
		private ProductService productService;

		public StockServiceProxy(StockService stockService) {
				this.stockService = stockService;
		}

		public void decrease(Long id) {
				// 트랜잭션 시작 로직				
				...

				productService.decrease(id);


				// 트랜잭션 종료 로직
				...
		}
}

위와 같이 Proxy 클래스에서 decrease를 호출하여 트랜잭션 처리와 함께 로직을 실행합니다.

이때, 재고 감소가 DB에 반영되는 시점은 트랜잭션이 커밋되고 종료되는 시점입니다.
즉, 재고 감소 로직인 productService에서 decrease가 호출되고 트랜잭션이 종료되기 전까지는 재고 감소가 DB에 반영되지 않습니다.

@Transcational을 주석 처리하고 테스트를 실행하면 정상적으로 재고가 0이 됩니다.

그렇다면, 스프링의 @Transcational 때문에 정상적으로 synchronized가 동작하지 않았으니 @Transcational을 지워서 사용하면 될까요? synchronized의 문제점에 대해 살펴보겠습니다.

synchronized 문제점

  1. 트랜잭션이 보장되지 않습니다.
    위 코드는 @Transcational이 붙어있지 않아도 큰 문제가 발생하지 않지만, 하나의 트랜잭션으로 처리해야 하는 작업이 많아지는 경우 이들을 하나의 트랜잭션으로 처리할 수 없어집니다.

  2. 공유 데이터에 하나의 스레드만 접근이 가능하다는 조건이 하나의 프로세스에서만 보장됩니다.
    이러한 특징 때문에 Scale-out시, 즉 서버가 여러 대일 때 동시성이 보장되지 않습니다.


다중 서버 환경에서의 테스트

인텔리제이에서 포트를 달리하여 서버 2대를 실행하도록 하겠습니다.

위와 같이 Allow multiple instances를 추가해주세요.

server:
  port: 8080
server:
  port: 8085

위의 8080 포트로 한번 실행하고 그 다음 application.yml을 8085로 수정하고 서버를 또 실행하면 동시에 서버를 실행할 수 있습니다.

우선 단일서버에서 테스트를 실행해보겠습니다.

public class MultiServerProductTest {
    @Test
    void 단일_서버_환경에서의_테스트() throws InterruptedException {
        // given
        int threadCount = 100;
        RestTemplate restTemplate = new RestTemplate();
        ExecutorService executorService = Executors.newFixedThreadPool(40);
        CountDownLatch latch = new CountDownLatch(threadCount);

        // when
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
               try {
                   int port = 8080;
                   ResponseEntity<Void> forEntity = restTemplate
                        .getForEntity("http://localhost:" + port + "/products/1/decrease", Void.class);
               } finally {
                   latch.countDown();
               }
            });
        }
        latch.await();
    }
}

데이터베이스에 100개의 재고가 있었는데 테스트를 실행하면 결과가 아래와 같스빈다.

이를 통해 단일 서버에서는 synchronized 키워드가 동시성 문제를 방지해준다는 걸 확인할 수 있습니다.

이제 두 서버에 동시에 100개의 재고를 감소시키는 요청을 보내겠습니다.

    @Test
    void 다중_서버_환경에서의_테스트() throws InterruptedException {
        // given
        int threadCount = 100;
        RestTemplate restTemplate = new RestTemplate();
        ExecutorService executorService = Executors.newFixedThreadPool(40);
        CountDownLatch latch = new CountDownLatch(threadCount);

        // when
        for (int i = 0; i < threadCount; i++) {
            final int ii = i;
            executorService.submit(() -> {
                try {
                    int port = (ii % 2 == 0) ? 8080 : 8085;
                    ResponseEntity<Void> forEntity = restTemplate
                            .getForEntity("http://localhost:" + port + "/products/1/decrease", Void.class);
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();
    }

결과를 보면 재고가 0이 아닌 50입니다. 이를 통해 다중 서버 환경에서는 synchronized 키워드를 통해 동시성 문제를 해결할 수 없음을 확인할 수 있습니다.


Lock 키워드 사용

synchronized 이외에도 java에서 제공해주는 Lock을 이용하여 해결할 수 있습니다.

기존 서비스의 synchronized 키워드를 삭제해줍니다.

@Service
@RequiredArgsConstructor
public class ProductService {

    private final ProductRepository productRepository;

    @Transactional
    public void decrease(Long id) {
        Product product = productRepository.getById(id);
        product.decrease();
        productRepository.saveAndFlush(product);
    }
}

그리고 Lock을 통해 동시성 제어를 담당하는 Facade 만들어주겠습니다.

Facade 패턴은 복잡한 시스템에 대한 간단한 인터페이스를 제공하는 구조적 디자인 패턴입니다. 이 패턴의 목적은 시스템의 복잡성을 줄이고 클라이언트가 시스템을 더 쉽게 사용할 수 있도록 하는 것입니다.

@Service
@RequiredArgsConstructor
public class LockProductFacade {

    private ConcurrentHashMap<String, Lock> locks = new ConcurrentHashMap<>();

    private final ProductService productService;

    public void decrease(Long id) throws InterruptedException {
        Lock lock = locks.computeIfAbsent(String.valueOf(id), key -> new ReentrantLock());
        boolean acquiredLock = lock.tryLock(3, TimeUnit.SECONDS);
        if (!acquiredLock) {
            throw new RuntimeException("Lock 획득 실패");
        }
        try {
            productService.decrease(id);
        } finally {
            lock.unlock();
        }
    }
}

위 코드는 decrease() 메서드를 실행할 때 Lock을 획득과 동시에 메서드를 실행합니다. 따라서 메서드가 종료될 때까지 Lock을 획득하지 못하며 메서드 실행을 하지 못하게 되어 동시성을 해결 할 수 있습니다.

테스트 코드를 짜고 실행해보겠습니다.

@SpringBootTest
class LockProductFacadeServiceTest {
    @Autowired
    private LockProductFacadeService lockProductFacadeService;

    @Autowired
    private ProductRepository productRepository;

    private Long productId;

    @BeforeEach
    void setUp() {
        productId = productRepository.saveAndFlush(new Product(1L, 100))
                .getId();
    }

    @AfterEach
    void tearDown() {
        productRepository.deleteAll();
    }

    @Test
    void Lock을_이용한_동시성_재고감소_성공() throws InterruptedException {
        // given
        int threadCount = 100;
        ExecutorService executorService = Executors.newFixedThreadPool(40);
        CountDownLatch latch = new CountDownLatch(threadCount);

        // when
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    lockProductFacadeService.decrease(productId);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();

        // then
        Product product = productRepository.getById(productId);
        assertThat(product.getQuantity()).isEqualTo(0);
    }
}

성공하는걸 확인할 수 있습니다.

그러나 Lock을 사용하는 방법도 다중 서버에 대한 동시성 문제를 해결해주진 않습니다.


LockRegistry (SpringBoot 3.2 이상)

SpringBoot 3.2 버전 이상부터는 Spring Integration 의존성을 추가한 후, LockRegistry를 통해 다음과 같이 간단하게 사용할 수 있습니다.

LockRegistry는 여러 스레드나 프로세스가 동일한 리소스에 대해 동시에 접근하는 것을 방지하기 위해 Lock을 관리하는 메커니즘을 제공합니다. 이는 Lock의 생성, 관리, 제거 등을 보다 간편하게 해주는 유틸리티로 볼 수 있습니다.

@Configuration
public class LockConfig {

    @Bean
    public LockRegistry lockRegistry() {
        return new DefaultLockRegistry();
    }
}
@Service
@RequiredArgsConstructor
public class LockRegistryProductFacadeService {

    private final LockRegistry lockRegistry;
    private final ProductService productService;

    public void decrease(Long id) throws InterruptedException {
        lockRegistry.executeLocked(String.valueOf(id), () -> {
            productService.decrease(id);
        });
    }
}

테스트 코드를 작성하면 아래와 같습니다.

@SpringBootTest
class LockRegistryProductFacadeServiceTest {
    @Autowired
    private LockRegistryProductFacadeService lockRegistryProductFacadeService;

	...

    @Test
    void LockRegistry를_이용한_동시성_재고감소_성공() throws InterruptedException {
        // given
		...

        // when
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                try {
                    lockRegistryProductFacadeService.decrease(productId);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();

        // then
        ...
    }
}

마찬가지로 RegistryLock을 이용한 방식도 단일 서버에서만 동시성 제어를 보장합니다.

마무리

다음 글에서는 데이터베이스의 낙관적 락비관적 락을 통해 다중 서버 환경에서 동시성 문제를 해결하는 방법에 대해 알아보겠습니다.






참고
https://ttl-blog.tistory.com/1567
https://ksh-coding.tistory.com/125

profile
서버 백엔드 개발자

0개의 댓글