Kafka Spring 활용 [1] Producer, Consumer

최준호·2022년 3월 19일
0

Microservice Architecture

목록 보기
27/32
post-thumbnail

📘Kafka Spring에서 활용하기

Kafka의 동작 구조를 살펴봤으니 실제로 Spring Cloud에서는 어떻게 사용하는지 직접 만들어서 사용해보자.

우리가 지금까지 만들었던 catalog-service와 order-service를 사용하여 주문이 발생했을 때 catalog-service에 존재하는 재고 qty의 값을 줄여주는 로직을 kafka의 메세지를 주고 받음으로써 처리하는 로직을 만들어보려고 한다.

🔨Catalog Service (Consumer) 수정

<!-- Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

kafka 의존성을 추가해주고

@EnableKafka    //kafka 설정 추가
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory(){
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");  //kafka 실행 서버 ip
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");          //Consumer들을 그룹핑 할수 있다.
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);     //KEY 값을 String de serializer로 지정
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);  //VALUE 값을 String de serializer로 지정

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());    //위에서 설정한 consumerFactory를 설정해줌

        return kafkaListenerContainerFactory;
    }
}

src 아래 messagequeue 패키지를 생성하여 KafkaConsumerConfig 파일을 만들어 Kafka 설정값을 추가해주었다. Catalog는 Order에서 발생된 메세지를 읽어서 사용하는 쪽(Consumer)이므로 DESERIALIZER로 세팅해주었다.

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
    private final CatalogRepository repository;

    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage){
        log.info("kafka message = {}", kafkaMessage);

        //kafka 메세지 역 직렬화
        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();

        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        CatalogEntity entity = repository.findByProductId(map.get("productId").toString());
        if(entity != null){
            entity.setStock(entity.getStock() - (Integer)map.get("qty"));
            //update
            repository.save(entity);
        }
    }
}

그리고 @KafkaListener 어노테이션을 사용하여 example-catalog-topic 토픽에 대해 대기하도록 등혹간 뒤 ObjectMapper를 사용하여 json 데이터를 파싱한 후 로직을 실행하도록 코드를 작성한다.

🔨Order Service (Producer) 수정

<!-- Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

kafka 의존성을 동일하게 추가해주고

@EnableKafka
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory(){
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");  //kafka 실행 서버 ip
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);     //KEY 값을 String serializer로 지정
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);  //VALUE 값을 String serializer로 지정

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

위와는 반대로 Producer로써의 설정으로 추가해준다. 거의 동일하지만 모든 설정이 Consumer가 아닌 Producer인것을 확인하고 코드를 작성하자!

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderDto orderSend(String topic, OrderDto orderDto){
        ObjectMapper mapper = new ObjectMapper();
        //json format으로 변경
        String json = "";
        try {
            json = mapper.writeValueAsString(orderDto);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        //kafka 메세지 전송
        kafkaTemplate.send(topic, json);
        log.info("Kafka Producer send data from the order service = {}",orderDto);

        return orderDto;
    }
}

그리고 topic과 OrderDto 객체를 매개변수로 전달받아 OrderDto는 json 형태로 변환해주고 KafkaTemplatesend()를 통해서 메세지를 전달하도록 orderSend를 작성해준다.

@RestController
@RequestMapping("/order-service")
@RequiredArgsConstructor
public class OrderController {
    private final Environment env;
    private final OrderService orderService;
    private final KafkaProducer kafkaProducer;  //kafka producer 주입

   ...
   
    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder){
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        //기존의 jpa 로직
        OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
        orderDto.setUserId(userId);
        OrderDto createOrder = orderService.createOrder(orderDto);
        ResponseOrder responseOrder = mapper.map(createOrder, ResponseOrder.class);

        //kafka 로직 추가
        kafkaProducer.orderSend("example-catalog-topic", orderDto);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }
}

Controller에서 기존의 내부 로직만 실행하던 부분에 kafka 메세지 전달 로직을 추가로 작성해준다.

👏Test 해보기

실행에 앞서 Config server, Eureka Server, Kafka Server, Kafka zookeaper Server, Gateway Server 모두 실행해준 뒤 서비스를 실행시키자

./bin/windows/kafka-server-start.bat ./config/server.properties
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties

그 후에 Catalog service에서 DB를 확인하면 초기 데이터로 세팅되어 있는 것을 확인할 수 있다.

Order Service가 실행되었고 정상 return을 받았다.

user-service를 실행하여 정상적인 로그인 후 아이디 값을 받지 않았지만 주문만 테스트하고 주문시에 user를 체크하는 로직이 없기 때문에 정상적으로 실행된다.

order-service에서 정상적으로 메세지를 전송했다.

catalog-service에서도 메세지를 정상적으로 수신했고

주문 가능 수량도 90으로 줄어든 것을 확인할 수 있다.

profile
해당 주소로 이전하였습니다. 감사합니다. https://ililil9482.tistory.com

0개의 댓글