[Spring Cloud] 서비스에 Kafka Topic 적용

jsieon97·2023년 3월 20일
0

Order, Catalog 서비스에 적용

  • OrderService에 요청 된 주문의 수량 정보를 CatalogService에 반영
  • OrderService에서 Kafka Topic으로 메시지 전송 -> Producer
  • CatalogService에서 Kafka Topic에 전송 된 메시지 취득 -> Consumer

CatalogService

Dependencies

// build.gradle

implementation 'org.springframework.kafka:spring-kafka'

KafkaConsumer

// KafkaCunsumerConfig.java

// Kafka consumer 설정을 위한 Configuration 클래스
@Configuration
@EnableKafka // Kafka를 사용하기 위해 Spring Kafka를 활성화함
public class KafkaConsumerConfig {

    // Kafka consumer를 생성하기 위한 ConsumerFactory를 생성하는 Bean 메서드
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        // Kafka consumer 설정을 위한 Properties 객체 생성
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // Kafka broker의 주소와 포트 설정
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "127.0.0.1:9092"); // consumer group ID 설정
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // key deserializer 설정
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // value deserializer 설정

        // Properties 객체를 사용하여 ConsumerFactory 생성
        return new DefaultKafkaConsumerFactory<>(properties);
    }

    // Kafka listener container를 생성하기 위한 Bean 메서드
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        // ConcurrentKafkaListenerContainerFactory 생성
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();

        // ConsumerFactory를 사용하여 Kafka listener container를 생성함
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }
}
// KafkaConsumer.java

@Service
@Slf4j
public class KafkaConsumer {
    private CatalogRepository repository;

    // 생성자를 통해 CatalogRepository bean을 주입받음
    @Autowired
    public KafkaConsumer(CatalogRepository repository) {
        this.repository = repository;
    }

    // KafkaListener annotation을 통해 example-catalog-topic의 메시지를 수신함
    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage) {
        log.info("Kafka Message: ->", kafkaMessage);

        // JSON 데이터를 Map으로 파싱함
        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        // productId를 기반으로 CatalogEntity를 찾음
        CatalogEntity entity = repository.findByProductId((String) map.get("productId"));

        // CatalogEntity가 존재하면 해당 제품의 재고를 업데이트함
        if(entity != null) {
            entity.setStock(entity.getStock() - (Integer) map.get("qty"));
            repository.save(entity);
        }
    }
}

OrderSerivce

Dependencies

// build.gradle

implementation 'org.springframework.kafka:spring-kafka'

KafkaProducer

// KafkaProducerConfig.java

// Kafka producer 설정을 위한 Configuration 클래스
@Configuration
@EnableKafka // Kafka를 사용하기 위해 Spring Kafka를 활성화함
public class KafkaProducerConfig {

    // Kafka producer를 생성하기 위한 ProducerFactory를 생성하는 Bean 메서드
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        // Kafka producer 설정을 위한 Properties 객체 생성
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // Kafka broker의 주소와 포트 설정
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // key serializer 설정
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // value serializer 설정

        // Properties 객체를 사용하여 ProducerFactory 생성
        return new DefaultKafkaProducerFactory<>(properties);
    }

    // KafkaTemplate을 생성하기 위한 Bean 메서드
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        // ProducerFactory를 사용하여 KafkaTemplate 생성
        return new KafkaTemplate<>(producerFactory());
    }
}
// KafkaProducer.java

// Kafka 메시지를 생성하고 보내기 위한 Kafka Producer 서비스 클래스
@Service
@Slf4j
public class KafkaProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    // KafkaTemplate 객체를 생성자를 통해 주입 받음
    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // 주어진 주제(topic)로 OrderDto 객체를 Kafka 메시지로 전송하고, 전송한 OrderDto 객체를 반환하는 메서드
    public OrderDto send(String topic, OrderDto orderDto) {
        ObjectMapper mapper = new ObjectMapper();
        String jsonInstring = "";
        try {
            jsonInstring = mapper.writeValueAsString(orderDto); // OrderDto 객체를 JSON 문자열로 변환
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInstring); // KafkaTemplate을 사용하여 Kafka 메시지를 보냄
        log.info("Kafka Producer sent data from the Order microservice: " + orderDto);

        return orderDto; // 전송한 OrderDto 객체를 반환
    }
}

OrderController

// OrderController.java

@RestController
@RequestMapping("/order-service")
public class OrderController {
    private Environment env;
    private OrderService orderService;
    private KafkaProducer kafkaProducer;

    public OrderController(Environment env,
                           OrderService orderService,
                           KafkaProducer kafkaProducer) {
        this.env = env;
        this.orderService = orderService;
        this.kafkaProducer = kafkaProducer;
    }

    ...

    // http://127.0.0.1:0/order-service/{user_id}/orders/
    // 사용자 ID와 주문 정보(orderDetails)를 받아 새 주문을 생성하는 REST API 엔드포인트
    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
                                                     @RequestBody RequestOrder orderDetails) {
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        /* JPA */
        // RequestOrder 객체를 OrderDto 객체로 변환하여 userId를 추가
        OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
        orderDto.setUserId(userId);
        // OrderDto 객체를 사용하여 주문 생성
        OrderDto createdOrder = orderService.createOrder(orderDto);

        // 생성된 OrderDto 객체를 ResponseOrder 객체로 변환
        ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);

        /* send this order to the kafka */
        // KafkaProducer 객체를 사용하여 주문 정보를 Kafka 메시지로 전송
        kafkaProducer.send("example-catalog-topic", orderDto);

        // 생성된 ResponseOrder 객체를 HttpStatus.CREATED 상태코드와 함께 반환
        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }

    ...
    
}

테스트

ZookeeperServer, KafkaServer, EurekaServer, ConfigService, API Gateway 실행

개수만큼 stock 감소된 것을 볼 수 있다. (동기화 처리)

profile
개발자로써 성장하는 방법

0개의 댓글