이번 소모임 프로젝트를 하면서 해당 이벤트를 참여할 때 제한인원이 있는데 여러요청이 한꺼번에 들어왔을 시 동시성 이슈에 대해서 해결을 했었다.
DB lock 과 redisson, kafka 를 이용해서 동시성이슈를 해결했었다.
하지만 조금 더 개념확립과 익숙하게 사용하기 위해서 다시 한번 조그마한 프로젝트로 다른 방법인 redis의 incr 과 kafka를 이용해서 다양한 사례와 함께 실습을 할 것이다.
선착순 100명에게만 지급이 되어야 한다.
101개 이상 지급이 되면 안 된다.
순간적으로 몰리는 트래픽을 버틸 수 있어야 한다.
카운트 함수를 쓸 것이기 때문에 인덱스 설정을 해줬다.
@Entity
@AllArgsConstructor @NoArgsConstructor(access = AccessLevel.PROTECTED)
@Data
@Table( name = "coupons",
indexes = {
@Index(name = "idx_user_id", columnList = "userId")
})
@Builder
public class Coupon {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long userId;
}
@Service
@RequiredArgsConstructor
public class CouponService {
private final CouponRepository couponRepository;
public void publishCoupon(Long userId) {
long count = couponRepository.count();
if (count > 100) {
return ;
}
couponRepository.save(Coupon.builder().userId(userId).build());
}
}
repository 는 jpa만 extends 했기 때문에 생략..
@SpringBootTest
class CouponServiceTest {
@Autowired
private CouponService couponService;
@Autowired
private CouponRepository couponRepository;
@Test
public void 한명만응모() {
couponService.publishCoupon(1L);
long count = couponRepository.count();
assertThat(count).isEqualTo(1);
}
}
일반적으로 쿠폰하나는 정상적으로 발행이 된다. 그렇다면 굉장히 많은 요청이 순식간에 들어오게 된다면 ..? 테스트 코드로 1000번정도 실행을 해보자..
@Test
public void 여러명응모() throws InterruptedException {
// 동시에 여러개의 요청을 보낼것이기 때문에 멀티쓰레드 사용
// 1000개의 요청을 보낼 것 임.
int threadCount = 1000;
// 멀티쓰레드를 사용할 것 이기 때문에 ExecutorService 사용
// ExecutorService -> 병렬작업을 간단하게 할 수 있게 해주는 java api
ExecutorService executorService = Executors.newFixedThreadPool(32);
// 모든 요청이 끝날 때 까지 기다려야 하므로 CountDownLatch 사용..
// CountDownLatch 는 다른 쓰레드에서 사용하는 작업을 기다리도록 도와주는 class..
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
long userId = i;
executorService.submit(() -> {
try {
couponService.publishCoupon(userId);
} finally {
latch.countDown();
}
});
}
latch.await();
// 모든 요청이 끝나고 난 후 쿠폰의 개수 카운팅..
long count = couponRepository.count();
// 원래 의도대로 라면 100이 나와야 함..
assertThat(count).isEqualTo(100);
}
테스트코드는 통과할 수 있을까 ?
원래의 의도대로 라면 100장이 발급되어야 하지만 1000번의 요청이 들어오니 122개의 쿠폰이 발급 되었다.
이유는 왜일까 ?
이라는게 발생 했기 때문이다.. 그렇다면 레이스 컨디션이란 ?
두 개 이상의 프로세스 혹은 스레드가 공유 자원을 서로 사용하려고 경합(Race)하는 현상을 의미한다.
위와 같이 첫번째 쓰레드가 99번째에서 아직 100개가 아니므로 쿠폰을 발급하고 count 가 100이 되면 다른 쓰레드에서 발급을 시도 했을 때 실패하는걸 예상 할 것이다.
하지만 실제로는 ?
쓰레드 1에서 99번째에서 100번째 쿠폰을 발급하기 전에 다른 쓰레드에서 쿠폰의 개수를 가져간다. 따라서 쓰레드2도 새로운 쿠폰을 발급하게 되는 것 이다.
레이스 컨디션은 두개 이상의 쓰레드에서 공유자원에 엑세스하므로 발생되는 것이기에 싱글 쓰레드로 작업을 한다.
java - synchronized
mysql & redis 를 활용한 lock
하지만 내가 원하는 건 쿠폰의 개수에 대한 정합성이다. 하지만 lock 을 활용하면 ?
발급된 쿠폰개수 부터 쿠폰 발행까지 락을 걸어야 한다.
락을 거는 구간이 길어져서 성능 이슈가 생길 가능성이 있음.
@Repository
@RequiredArgsConstructor
public class CouponCountRepository {
private final RedisTemplate<String, String> redisTemplate;
public Long increment() {
return redisTemplate.opsForValue().increment("coupon_count");
}
}
@Service
@RequiredArgsConstructor
public class CouponService {
private final CouponRepository couponRepository;
private final CouponCountRepository couponCountRepository;
public void publishCoupon(Long userId) {
// 발급전에 발급 된 쿠폰 개수 증가..
Long count = couponCountRepository.increment();
if (count > 100) {
System.out.println("100보다 커졋어");
return ;
}
couponRepository.save(Coupon.builder().userId(userId).build());
}
}
테스트 코드도 통과하였고 DB에도 딱 100개의 쿠폰만 발급이 가능하게 되었다.
현재의 로직은 쿠폰 발급 요청이 들어오면 redis 를 활용해서 발급된 쿠폰의 개수를 가져온 후에 발급이 가능하다면 RDB 에 저장하는 방식이다.
하지만 발급하는 쿠폰의 개수가 많아질수록 RDB에 부하를 주게 된다.
만약 사용하는 RDB가 쿠폰전용 DB가 아니라면 다른 서비스에도 영향을 줄 수 있는 가능성이 있다.
10:00 - 쿠폰 10,000개 발급 요청
10:01 - 주문생성 요청
10:02 - 회원가입 요청
만약 위 처럼 요청이 들어온다면 ?
1분에 100개의 insert만 가능하다고 가정 했기 때문에 10:00에 쿠폰 10,000개를 위한 100분이 소요되고 그 이후에 작업이 진행된다.
작업이 느리게 라도 진행이 되면 다행이지 보통은 타임아웃에 걸리게 되면서 주문생성,회원가입 뿐 아니라 10,000개의 쿠폰도 완전히 생성 되지 않을 수 있음.
또한 단순간에 많은 요청이 들어오게 되면 DB서버의 리소스를 많이 사용하게 되므로 부하가 발생하고 이는 곧 서비스 지연 및 에러를 발생시킨다.
하지만 단순간에 많은 요청이 들어온다면 Redis 서버에 부하가 크게 발생할 수 있다.
Redis는 메모리 내에서 데이터를 저장하고 처리하기 때문에, 대량의 요청이 동시에 들어오면 메모리 사용량이 급증하고 CPU 자원도 많이 사용될 것입니다. 이로 인해 Redis 서버의 응답 시간이 느려지거나, 서버가 부하로 인해 응답하지 못할 수 있습니다.
우리의 DB 서버 부하를 줄이면서 kafka 사용으로 성능을 높일 수 있음.
메시지 큐(Queue) 방식의 시스템으로 설계되어 있고, 메시지 순서 보장으로 인한 DB 동시성 이슈 해결.
Kafka는 비동기 메시징 시스템으로 메시지를 브로커에 전송하고 나중에 컨슈머가 메시지를 처리합니다. 이로 인해 요청을 즉시 처리하지 않아도 되며, 처리할 때까지 대기하지 않고 다른 작업을 수행할 수 있습니다.
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Long> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Long> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Component
@RequiredArgsConstructor
public class CouponCreateProducer {
private final KafkaTemplate<String, Long> kafkaTemplate;
public void create(Long userId) {
kafkaTemplate.send("coupon_create", userId);
}
}
@Service
@RequiredArgsConstructor
public class CouponService {
private final CouponRepository couponRepository;
private final CouponCountRepository couponCountRepository;
private final CouponCreateProducer couponCreateProducer;
public void publishCoupon(Long userId) {
// 발급전에 발급 된 쿠폰 개수 증가..
Long count = couponCountRepository.increment();
if (count > 100) {
return ;
}
couponCreateProducer.create(userId);
//couponRepository.save(Coupon.builder().userId(userId).build());
}
}
아까의 로직에서 RDB save 대신 kafka 설정과 producer로 변경하였음.
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Long> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Component
@RequiredArgsConstructor
public class CouponCreateConsumer {
private final CouponRepository couponRepository;
@KafkaListener(topics = "coupon_create", groupId = "group_1")
public void listener(Long userId) {
couponRepository.save(Coupon.builder().userId(userId).build());
}
}
consumer 설정 후 토픽에서 데이터를 받아올 때 rdb에 save 하였음.
아래는 kafka 추가 후 테스트 코드 이다.
@Test
public void 여러명응모() throws InterruptedException {
// 동시에 여러개의 요청을 보낼것이기 때문에 멀티쓰레드 사용
// 1000개의 요청을 보낼 것 임.
int threadCount = 1000;
// 멀티쓰레드를 사용할 것 이기 때문에 ExecutorService 사용
// ExecutorService -> 병렬작업을 간단하게 할 수 있게 해주는 java api
ExecutorService executorService = Executors.newFixedThreadPool(32);
// 모든 요청이 끝날 때 까지 기다려야 하므로 CountDownLatch 사용..
// CountDownLatch 는 다른 쓰레드에서 사용하는 작업을 기다리도록 도와주는 class..
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
long userId = i+1;
executorService.submit(() -> {
try {
couponService.publishCoupon(userId);
} finally {
latch.countDown();
}
});
}
latch.await();
Thread.sleep(10000);
// 모든 요청이 끝나고 난 후 쿠폰의 갯수 카운팅..
long count = couponRepository.count();
// 원래 의도대로 라면 100이 나와야 함..
assertThat(count).isEqualTo(100);
}
producer가 topic으로 데이터를 던지고 , consumer 가 전부 다 data를 받아올 때 까지의 시간보다 테스트 코드가 더 먼저 실행이 되어서, 자꾸 통과되지 못했다. 그래서 sleep을 10초 걸어주었음.
정확히 100개만 발급이 완료되었다.
Coupon entity 에서 userId와 conponType 필드를 추가해서 유니크 키를 주고 db 에서 막는 방법.
db lock
비즈니스 로직에서 시작부분에 lock을 걸고 발급 유무 체크를 한 뒤 됐으면 return 해주고 안 되었으면 발급 요청을 하고 producer가 데이터를 전송하고 나서 unlock을 한다.
이 방법은 kafka producer는 topic 으로 data 전송만 하는 역할을 한다. 그리고 consumer는 topic 에서 data 를 받고 save 할 준비를 하는데, 그 작업이 되기전에 다시한번 쿠폰 발급 요청이 온다면?
아직 consumer 에서 save 하지 않았기 때문에 2개의 쿠폰이 발급이 될 수 있다.
kafka 를 사용하지 않고 http 통신을 한다고 해도 lock 범위가 너무 넓어진다. 고로 성능이슈가 생길 가능성이 높음.
Set 사용
sadd 명령을 통해 key(test) , value(1) 을 삽입한 모습
같은 key 로 삽입하니 중복이기 때문에 추가 된 value가 없어서 0을 return 하는 모습
아래의 코드로 redis 에서 set을 사용하기 위한 repository 를 만들고..
@Repository
@RequiredArgsConstructor
public class PublishedUserRepository {
private final RedisTemplate<String, String> redisTemplate;
public Long add(Long userId) {
return redisTemplate.opsForSet().add("published_user", userId.toString());
}
}
public void publishCoupon(Long userId) {
Long add = publishedUserRepository.add(userId);
// 이미 쿠폰을 발급 받음
if (add != 1) {
return ;
}
// 발급전에 발급 된 쿠폰 개수 증가..
Long count = couponCountRepository.increment();
if (count > 100) {
return ;
}
couponCreateProducer.create(userId);
}
@Test
public void 한명당_한개의쿠폰만_발급() throws InterruptedException {
// 동시에 여러개의 요청을 보낼것이기 때문에 멀티쓰레드 사용
// 1000개의 요청을 보낼 것 임.
int threadCount = 1000;
// 멀티쓰레드를 사용할 것 이기 때문에 ExecutorService 사용
// ExecutorService -> 병렬작업을 간단하게 할 수 있게 해주는 java api
ExecutorService executorService = Executors.newFixedThreadPool(32);
// 모든 요청이 끝날 때 까지 기다려야 하므로 CountDownLatch 사용..
// CountDownLatch 는 다른 쓰레드에서 사용하는 작업을 기다리도록 도와주는 class..
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
couponService.publishCoupon(1L);
} finally {
latch.countDown();
}
});
}
latch.await();
Thread.sleep(10000);
// 모든 요청이 끝나고 난 후 쿠폰의 갯수 카운팅..
long count = couponRepository.count();
// 원래 의도대로 라면 1이 나와야 함..
assertThat(count).isEqualTo(1);
}
정상적으로 테스트코드가 통과하고 1개의 data만 적재됨을 확인할 수 있다.
따라서 에러발생시 백업 data 와 log 를 남겨보도록 하겠다.
@Entity
@AllArgsConstructor @NoArgsConstructor(access = AccessLevel.PROTECTED) @Builder
@Data
public class FailedEvent {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long userId;
}
@KafkaListener(topics = "coupon_create", groupId = "group_1")
public void listener(Long userId) {
try {
couponRepository.save(Coupon.builder().userId(userId).build());
} catch (Exception e) {
log.error("failed to create coupon :: " + userId);
failedEventRepository.save(FailedEvent.builder().userId(userId).build());
}
}
이후에 db 에 적재된 data를 다시 발급하는 배치프로그램을 작성하면 100개의 쿠폰이 전부 발급이 될 것이다.
평소에 알고는 있어도 100% 내것으로 만들고 사용하지 못했던 것 같았고, 이번에 kafka를 배우면서 다시한번 개념확립을 위해 진행했던 프로젝트이다.
동시성 문제에 대해서는 앞으로도 적용할 곳이 수두룩하니 완벽이해가 필요했고 목적을 달성한 프로젝트 였다.