플랫폼에서는 선착순으로 쿠폰을 제공하는 이벤트를 한다.
수많은 이용자가 동시에 쿠폰 발급을 누르는 상황이 발생한다.
이때 동시성 문제가 발생할 수 있다. 플랫폼에서 쿠폰 1000개만 제공하려고 했고, 1000개 이상의 쿠폰이 발급되지 않게끔 로직을 구현하더라도 쿠폰이 1100개가 뿌려질 수 있다.
동시성 문제를 해결할 때 Redis와 Kafka를 활용할 수 있다.
단일 스레드인 Redis는 요청이 많이 몰려도 작업을 순차적으로 실행한다. 제한된 물량이 다 사라졌을 때 더이상의 작업을 하지 않게 된다.
public CouponCountRepository(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public Long increment() {
return redisTemplate
.opsForValue()
.increment("coupon_count");
}
public void apply(Long userId) {
long count = couponCountRepository.increment();
//쿠폰의 개수
if (count > 100) {
return;
}
couponRepository.save(new Coupon(userId));
}
다만, Redis로 대규모 트래픽을 감당하기는 어렵다. 요청이 너무 많이 몰리면 서비스 지연이나 성능 저하가 올 수 있다.
카프카는 실시간으로 기록 스트림을 게시, 구독, 저장 처리하는 분산 데이터 스트리밍 플랫폼이다. 메신지 발신자가 구독자라고 하는 특정 수신자에게 직접 메시지를 보낸다.
브로커를 통해서 메시지를 발행하고, 메시지를 구독할 컨슈머가 브로커에게 요청해서 메시지를 가져간다.
카프카는 토픽이라는 곳에 데이터를 추가하고 컨슈머에서 차례대로 메시지를 가져간다. 컨슈머는 토피에 있는 데이터 1개를 가져와서 쿠폰을 발급하고, 다음 데이터를 갖고온다.
속도가 빠르지는 않을 것이다. 다만, 이처럼 쿠폰을 발급할 때는 속도보다는 정확도가 거 더 중요하다.
쿠폰 요청을 받는 서버에서 바로 발급까지 하면 빠르게 쿠폰을 발급할 수 있지만, 동시성 문제가 발생할 수 있다. 쿠폰 발급 서버의 계층을 분리해도 마찬가지다.
이때, 카프카에 메세지를 보내고, 컨슈머가 차례대로 이를 가져가는 방식을 쓰면 처리량을 1초에 한번씩 가게 조절할 수 있다. 레디스를 쓸 때처럼 처리량이 몰려서 과부하가 심해지는 문제를 피할 수 있는 것이다.
카프카를 순서를 보장하는 메시지 큐로 사용해서, 동시성 문제도 해결할 수 있다.
@EnableKafka
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
//메시지 송수신이 안 되면 카프카 버전과 스프링의 카프카, 스프링 부트가 제대로 호환되는 버전인지도 확인 해보자.
}
도커 컴포즈 파일을 작성한다.
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
restart: unless-stopped
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
restart: unless-stopped
카프카 시작
docker exec -it 컨테이너 id bash
cd /opt/kafka/bin
토픽 목록 조회
./kafka-topics.sh --bootstrap-server localhost:9092 --list
토픽 생성
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic coupon_create
프로듀서 생성
./kafka-console-producer.sh --broker-list localhost:9092 --topic coupon_create
컨슈머 생성
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic coupon_create --from-beginning
토픽 삭제
./kafka-topics.sh --delete --topic <토픽_이름> --bootstrap-server localhost:9092
3) application.properties 파일
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
logging.level.org.apache.kafka=info
logging.level.org.springframework.kafka=info
4) 리스너 네트워크 주석 처리 해제
apt-get update
apt-get install nano
cd /opt/kafka/config
nano server.properties
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://your_public_ip(localhost):9092
토픽을 삭제하려면 저 설정파일에 delete.topic.enable=true 추가하고
exit
docker restart kafka id
변경 사항을 저장하고 nano를 종료하려면, CTRL + X를 누르고, Y를 누른 후 ENTER를 누른다.
1)메시지 송수신이 안 될 때?
클래스에 @Configuration, @Component 어노테이션이 붙어 있는지 확인하자. 프로듀서와 컨슈머에 어노테이션이 안 붙어있는 걸 모르고 엄청나게 헤맸다...
2)configuration 이름 주의
@Configuration
public class ProducerConfig {
@Bean
public ProducerFactory<String, Long> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, Long> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
ProducerConfig를 다른 이름으로 변경해야 중복 문제가 사라짐
3)토픽 이름 틀리지 않게 주의
-->자꾸 coupon_create랑 coupon-create처럼 동일하지 않아서 메시지가 안감
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Long> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, "group1");
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Long> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, Long> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Component
@RequiredArgsConstructor
@Slf4j
public class CouponCreateProducer {
private final KafkaTemplate<String, Long> kafkaTemplate;
public void create(Long userId) {
log.info("메시지 보냄" + userId);
CompletableFuture<SendResult<String, Long>> message = kafkaTemplate.send("coupon_create", userId);
message.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
} else {
System.err.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
}
});
}
}
@Component
@RequiredArgsConstructor
@Slf4j
public class CouponCreateConsumer {
private final CouponRepository couponRepository;
@KafkaListener(topics = "coupon_create", groupId = "group1")
public void listener(Long userId) {
log.info("메시지 받음" + userId);
couponRepository.save(new Coupon(userId));
}
}
레디스 설정
@Repository
@RequiredArgsConstructor
public class CouponCountRepository {
private final RedisTemplate<String, String> redisTemplate;
public Long increment() {
return redisTemplate
.opsForValue()
.increment("coupon_count");
}
}
@Service
@RequiredArgsConstructor
@Transactional
public class CouponService {
private final CouponCountRepository couponCountRepository;
private final CouponCreateProducer couponCreateProducer;
public void apply(Long userId) {
long count = couponCountRepository.increment();
//쿠폰의 개수
if (count > 100) {
return;
}
couponCreateProducer.create(userId);
}
//테스트를 해도 실패할 수 있음
}