해당 글은글로벌 캐시로 redis 사용해보기 + 동시성 제어 1편에 이어서, Synchronized 키워드를 통해서 동시성을 만족시키는 실습을 이어가 보도록 하겠습니다.
우선, 실습을 할 코드 배경을 살펴 보자면, 간단하게 Store 엔티티를 만들고, increas()메서드를 호출하여서 Count값을 증가시키는 실습을 해볼 것입니다.
@Entity
@Getter
@Setter
@Table(name = "store")
public class Store {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private Long storeId;
@Column(nullable = false)
private Long likeCount;
private static final long DEFAULT_INCREASED_LIKE_COUNT = 1;
// 기본 생성자 (JPA에서 사용)
protected Store() {}
public Store (long id,Long lcount){
storeId=id;
likeCount=lcount;
}
// 좋아요 수 증가 메서드
public void increase() {
this.likeCount += 1;
}
}
@Service
public class StoreService {
private final StoreLikeRepository storeLikeRepository;
// 생성자를 통한 StoreLikeRepository 주입
public StoreService(StoreLikeRepository storeLikeRepository) {
this.storeLikeRepository = storeLikeRepository;
}
@Transactional
public void increase(Long storeLikeId) {
try {
Store store = storeLikeRepository.findById(storeLikeId)
.orElseThrow(() -> new NoSuchElementException("Store not found with id: " + storeLikeId));
store.increase();
} catch (NoSuchElementException e) {
System.err.println("Error: " + e.getMessage());
}
}
}
@SpringBootTest
public class StoreLikeServiceTest {
@Autowired
private StoreLikeRepository storeLikeRepository;
@Autowired
private StoreService storeService;
@Test
public void testIncreaseLikeCount() {
// given
Store storeLike = storeLikeRepository.save(new Store(1L, 0L));
// when
storeService.increase(storeLike.getId());
Store findStoreLike = storeLikeRepository.findById(storeLike.getId()).orElseThrow();
// then
assertThat(findStoreLike.getLikeCount()).isEqualTo(1);
}
}
해당 Test는 Store를 저장하고, storeService에서 increase()메서드를 호출하여서 count값이 정상적으로 증가했는지 확인하는 간단한 예제 입니다.
정상적으로 통과했습니다.
이번에는 스레드를 사용하여서 해당 스레드들을 사용해서 동시에 총 100개의 좋아요를 증가시키는 로직을 수행해보도록 하겠습니다.
@Test
void testConcurrentLikeIncrease() throws InterruptedException {
// given
Store postLike = storeLikeRepository.save(new Store(1L, 0L));
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(20);
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
// when
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
storeService.increase(postLike.getId());
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
Store findViewCount = storeLikeRepository.findById(postLike.getId()).orElseThrow();
// then
assertThat(findViewCount.getLikeCount()).isEqualTo(100);
}
우선 해당 Test코드를 수행하면 @Transactional을 통해서 발생하는 문제들이 있는데, 해당 실습을 수행하면서 제가 만난 문제를 해결하는 과정을 이 글을 꼭 읽어주세요!!
코드에 대해서 간략하게 설명하자면, ExecutorService를 사용해서 스레드 풀을 20개를 만든다음에 for문을 100번돌면서 스레드 풀에서 스레드를 가져와서, 해당 스레드에서 increas()메서드를 호출하여서 count값을 올리는 것입니다.
여기서 중요한 부분은,
그런데 해당 코드를 실행해 보면,
동시성을 만족하지 못하는 것을 확인 할 수 있습니다.
멀티스레드에 의한 경쟁조건(Race Condition)이 발생하였기 때문입니다.
공유데이터인 likeCount에 스레드들이 동시에 접근해서 수정하기 때문에 순차적으로 좋아요가 증가되지 않습니다.
한마디로, 스레드 1이 좋아요를 증가시키지만 저장되기 전에, 스레드 2가 좋아요를 조회하고 해당 값을 토대로 증가 로직을 실행시키기 때문입니다.
해당 문제를 해결하기 위해서 이전 시간에 배웠던 Synchronized를 사용해 보도록 하겠습니다.
@Test
void synchronizedtest() throws InterruptedException {
// given
Store postLike = storeLikeRepository.save(new Store(1L, 0L));
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(20);
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
// when
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
storeService.increaseWithSynchronized(postLike.getId());
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
// then
Store findViewCount = storeLikeRepository.findById(postLike.getId())
.orElseThrow(() -> new NoSuchElementException("PostLike not found"));
assertThat(findViewCount.getLikeCount()).isEqualTo(100);
}
@Service
public class StoreService {
private final StoreLikeRepository storeLikeRepository;
// 생성자를 통한 StoreLikeRepository 주입
public StoreService(StoreLikeRepository storeLikeRepository) {
this.storeLikeRepository = storeLikeRepository;
}
@Transactional
public synchronized void increaseWithSynchronized(Long postLikeId) {
Store postLike = storeLikeRepository.findById(postLikeId)
.orElseThrow(() -> new NoSuchElementException("PostLike not found"));
postLike.increase();
}
...
우리는 Synchronized를 메서드에 적용하였으므로, 클래스의 인스턴스에 Lock을 건것입니다.
고로, 스레드 1에서 StoreService 인스턴스에 접근하여서 increaseWithSynchronized메서드를 실행하고있을때에는 스레드 2에서는 해당 메서드에 접근하지 못하고 대기하고 있다가, 스레드 1이 해당 메서드를 끝내고 락을 풀면 스레드 2에서 해당 메서드를 실행 할 수 있습니다.
하지만 결과를 보면 마찬가지로 실패하였습니다.
그렇다면, 동시성을 만족하도록 Synchronized를 사용했는데 실패한 이유가 무엇일까요?
그 이유는 서비스의 increaseWithSynchronized에 붙인 @Transactional 때문입니다.
Spring에서 @Transactional이 붙은 메서드는 AOP에 의해 아래와 같은 코드를 가지게 됩니다.
public class TxStoreService{
private final StoreService storeService;
ppublic synchronized void increaseWithSynchronized(Long postLikeId) {
staratTx();//트랜잭션 시작
...
postLike.increase();
endTx();//트랜잭션 끝
}
이때 아래와 같이 postLike.increase()메서드가 호출된 후 트랜잭션이 끝나기 전에 다른 쓰레드에서 postLike.increase()를 호출할 수 있습니다.
public class TxStoreService{
private final StoreService storeService;
ppublic synchronized void increaseWithSynchronized(Long postLikeId) {
staratTx();//트랜잭션 시작
...
postLike.increase();
//트랜잭션이 끝나기 전, 이 부분에서 다른 스레드가 increase메서드 호출 가능
endTx();//트랜잭션 끝
}
이 떄문에 동시성 문제가 해결되지 않은 것입니다.
고로, JPA의 변경 감지를 사용해서 Synchronized 블록 외부에 존재하는 트랜잭션 종료 시점에 데이터를 업데이트를 하는게 아니라, Synchronized 블록 내에서 데이터가 저장되도록, saveAndFlush()메서드를 사용해야합니다.
@Test
void synchronizedtest() throws InterruptedException {
// given
Store postLike = storeLikeRepository.save(new Store(1L, 0L));
int threadCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(20);
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
// when
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
storeService.increaseWithSynchronizedWithFlush(postLike.getId());
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
// then
Store findViewCount = storeLikeRepository.findById(postLike.getId())
.orElseThrow(() -> new NoSuchElementException("PostLike not found"));
assertThat(findViewCount.getLikeCount()).isEqualTo(100);
}
public synchronized void increaseWithSynchronizedWithFlush(Long postLikeId) {
Store postLike = storeLikeRepository.findById(postLikeId)
.orElseThrow(() -> new NoSuchElementException("PostLike not found"));
postLike.increase();
storeLikeRepository.saveAndFlush(postLike); // 변경 사항을 즉시 DB에 반영
}
해당 방법을 사용하면, 정상적으로 테스트가 성공하는 것을 볼 수 있습니다.
하지만 Synchronized 키워드를 사용하여 동시성을 보장하려고하면, 문제점이 존재한다.
요즘 대다수의 서비스 기업의 서버는 스케일 아웃을 통해서 여러대의 서버를 실행하고, 로드밸런스를 통해 부하를 분산하여 서비스를 제공한다.
그러나, Synchronized는 하나의 프로세스 내에서만 동시성을 보장하므로 이러한 환경에서는 동시성을 보장하는것이 불가능하다.
해당 내용을 실습해보자.
다중 서버 실행을 위해서, 파일을 복사하여 포트번호를 8081로 바꿔주고
Run -> Edit Configurations -> Modify Options -> Allow multiple instances를 체크 해주자.
그다음에, increase메서드를 호출하는 Controller를 하나 만들어준다.
@RequiredArgsConstructor
@Controller
public class StoreController {
private final StoreService storeService;
@GetMapping("/like")
public void like(){
storeService.increaseWithSynchronizedWithFlush(100L);
}
}
그다음 두개의 서버를 실행시킨 다음에, mysql에다가 inser쿼리를 날려서 id가 100인 스토어를 하나 만든다.
@Test
void decrease_with_100_request_to_multi_server() throws InterruptedException {
// given
int threadCount = 100;
RestTemplate restTemplate = new RestTemplate();
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
// when
for (int i = 0; i < threadCount; i++) {
final int ii = i;
executorService.submit(() -> {
try {
int port = (ii % 2 == 0) ? 8080 : 8081;
ResponseEntity<Void> responseEntity = restTemplate.getForEntity(
"http://localhost:" + port + "/like",
Void.class);
} finally {
latch.countDown();
}
});
}
latch.await();
}
해당 테스트를 실행하고 결과를 확인 해보면,
동시성을 만족하지 않는것을 확인 할 수 있습니다.
왜냐하면 Synchronized 키워드는 하나의 프로세스 내에서만 보장하는데, 서버가 2대 이상으로 늘어나는경우 여러 서버에서 데이터 접근이 가능하기 때문입니다.
고로 해당 문제를 해결하기 위해서는, 레디스의 분산락을 통해 동시성을 해결해야합니다.
해당 문제를 해결하는 방법에 대해서는 3편에서 소개해드리도록 하겠습니다.