// repository 계층
@Repository
public class CouponCountRepository {
private final RedisTemplate<String, String> redisTemplate;
public CouponCountRepository(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public Long increment(){
return redisTemplate
.opsForValue()
.increment("coupon_count");
}
}
// service 계층
@Service
public class ApplyService {
private final CouponRepository couponRepository;
private final CouponCountRepository couponCountRepository;
public ApplyService(CouponRepository couponRepository, CouponCountRepository couponCountRepository) {
this.couponRepository = couponRepository;
this.couponCountRepository = couponCountRepository;
}
public void apply(Long userId) {
Long count = couponCountRepository.increment();
if (count > 100) {
return;
}
couponRepository.save(new Coupon(userId));
}
}
의존성 추가: implementation 'org.springframework.kafka:spring-kafka'
// Producer 설정
@Configuration
public class KafkaProducerConfig {
// 프로듀서 설정
@Bean
public ProducerFactory<String, Long> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Long> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
// Producer 만들기
@Component
public class CouponCreateProducer {
private final KafkaTemplate<String, Long> kafkaTemplate;
public CouponCreateProducer(KafkaTemplate<String, Long> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void create(Long userId) {
kafkaTemplate.send("coupon_create", userId);
}
}
// Service 계층
@Service
public class ApplyService {
private final CouponRepository couponRepository;
private final CouponCountRepository couponCountRepository;
private final CouponCreateProducer couponCreateProducer;
public ApplyService(CouponRepository couponRepository, CouponCountRepository couponCountRepository, CouponCreateProducer couponCreateProducer) {
this.couponRepository = couponRepository;
this.couponCountRepository = couponCountRepository;
this.couponCreateProducer = couponCreateProducer;
}
public void apply(Long userId) {
Long count = couponCountRepository.increment();
if (count > 100) {
return;
}
couponCreateProducer.create(userId);
}
}
// consumer 설정
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Long> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localHost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
// 토픽에 있는걸 가져와서 저장함
@Component
public class CouponCreatedConsumer {
private final CouponRepository couponRepository;
public CouponCreatedConsumer(CouponRepository couponRepository) {
this.couponRepository = couponRepository;
}
@KafkaListener(topics = "coupon_create",groupId = "group_1")
public void listener(Long userId) {
couponRepository.save(new Coupon(userId));
}
}
// repository 계층
@Repository
public class AppliedUserRepository {
private final RedisTemplate<String, String> redisTemplate;
public AppliedUserRepository(RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public Long add(Long userID) {
return redisTemplate
.opsForSet()
.add("applied_user", userID.toString());
}
}
// service 계층
@Service
public class ApplyService {
private final CouponRepository couponRepository;
private final CouponCountRepository couponCountRepository;
private final CouponCreateProducer couponCreateProducer;
private final AppliedUserRepository appliedUserRepository;
public ApplyService(CouponRepository couponRepository, CouponCountRepository couponCountRepository, CouponCreateProducer couponCreateProducer, AppliedUserRepository appliedUserRepository) {
this.couponRepository = couponRepository;
this.couponCountRepository = couponCountRepository;
this.couponCreateProducer = couponCreateProducer;
this.appliedUserRepository = appliedUserRepository;
}
public void apply(Long userId) {
Long apply = appliedUserRepository.add(userId);
if (apply != 1) {
return;
}
Long count = couponCountRepository.increment();
if (count > 100) {
return;
}
couponCreateProducer.create(userId);
}
}
@Component
public class CouponCreatedConsumer {
private static final Logger log = LoggerFactory.getLogger(CouponCreatedConsumer.class);
private final CouponRepository couponRepository;
private final FailedEventRepository failedEventRepository;
public CouponCreatedConsumer(CouponRepository couponRepository, FailedEventRepository failedEventRepository) {
this.couponRepository = couponRepository;
this.failedEventRepository = failedEventRepository;
}
@KafkaListener(topics = "coupon_create",groupId = "group_1")
public void listener(Long userId) {
try {
couponRepository.save(new Coupon(userId));
} catch (Exception e) {
log.error("failed to create coupon ::" + userId);
failedEventRepository.save(new FailedEvent(userId));
}
}
}