Rental Application (React & Spring boot Microservice) - 17 : rental-service(3)

yellow_note·2021년 8월 31일
0

#1 시나리오

kafka를 적용하기에 앞서 대여 생성에 관한 시나리오를 살펴보도록 하겠습니다.

  • 대여 생성 : post-service를 이용하여 유저는 대여를 요청합니다. 그리고 post-service kafka 메시지 브로커로 대여에 대한 요청메시지를 전송합니다. 마지막으로 rental-service는 메시지 브로커에서 온 메시지를 읽어 들여 대여를 진행하고 대여에 관한 정보를 반환합니다.

그러면 이 시나리오를 바탕으로 kafka를 애플리케이션에 적용해보도록 하겠습니다.

#2 kafka적용

우선 컨슈머부터 구현해보도록 하겠습니다. 컨슈머는 메시지 브로커를 구독하고 있다가 구독하고 있는 토픽으로 메시지가 들어온다면 메시지를 읽어 들여옵니다. post-service와 rental-service에서는 구조상 rental-service가 메시지를 받고 대여를 진행해야 하므로 컨슈머의 역할을 합니다.

  • rental-serivce - pom.xml
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • message.KafakaConsumberConfig
package com.microservices.rentalservice.message;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();

        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }
}
  • KafkaConsumer
package com.microservices.rentalservice.message;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microservices.rentalservice.entity.RentalEntity;
import com.microservices.rentalservice.repository.RentalRepository;
import com.microservices.rentalservice.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Service
@Slf4j
public class KafkaConsumer {
    RentalRepository rentalRepository;

    @Autowired
    public KafkaConsumer(RentalRepository rentalRepository) {
        this.rentalRepository = rentalRepository;
    }

    @KafkaListener(topics="rental-topic")
    public void requestRental(String kafkaMessage) {
        log.info("Kafka Message : " + kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();

        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch(JsonProcessingException ex) {
            ex.printStackTrace();
        }

        RentalEntity rentalEntity = RentalEntity.builder()
                                                .rentalId(UUID.randomUUID().toString())
                                                .postId(Long.parseLong((String) map.get("postId")))
                                                .owner((String)map.get("owner"))
                                                .borrower((String)map.get("borrower"))
                                                .price(Long.parseLong((String) map.get("price")))
                                                .startDate((String)map.get("startDate"))
                                                .endDate((String)map.get("endDate"))
                                                .createdAt(DateUtil.dateNow())
                                                .build();

        rentalRepository.save(rentalEntity);
    }

}

컨슈머 부분에서 대여를 만드는 코드를 만들었고, 이제 프로듀서를 만들어 메시지를 생성해보도록 하겠습니다.

  • post-service - pom.xml
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • message.KafkaProducerConfig
package com.microservices.postservice.message;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • KafkaProducer
package com.microservices.postservice.message;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microservices.postservice.dto.PostDto;
import com.microservices.postservice.vo.RequestRental;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public RequestRental send(
        String topic,
        RequestRental postVo
    ) {
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString  = "";

        try {
            jsonInString = mapper.writeValueAsString(postVo);
        } catch(JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);

        log.info("Kafka Producer sent data from the Post Service: " + postVo);

        return postVo;
    }
}
  • RequestRental
package com.microservices.postservice.vo;

import lombok.Getter;

import javax.validation.constraints.NotNull;

@Getter
public class RequestRental {
    @NotNull(message="PostId cannot be null")
    Long postId;

    @NotNull(message="Owner cannot be null")
    String owner;

    @NotNull(message="Borrower cannot be null")
    String borrower;

    @NotNull(message="Price cannot be null")
    String price;

    @NotNull(message="StartDate cannot be null")
    String startDate;

    @NotNull(message="EndDate cannot be null")
    String endDate;
}
  • PostController
package com.microservices.postservice.controller;

import com.microservices.postservice.dto.CommentDto;
import com.microservices.postservice.dto.PostDto;
import com.microservices.postservice.message.KafkaProducer;
import com.microservices.postservice.service.CommentService;
import com.microservices.postservice.service.PostService;
import com.microservices.postservice.vo.RequestCreateComment;
import com.microservices.postservice.vo.RequestRental;
import com.microservices.postservice.vo.RequestWrite;
import com.microservices.postservice.vo.ResponsePost;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.List;

@RestController
@RequestMapping("/")
@Slf4j
public class PostController {
    private PostService postService;
    private CommentService commentService;
    private Environment env;
    private KafkaProducer kafkaProducer;

    @Autowired
    public PostController(
        PostService postService,
        CommentService commentService,
        Environment env,
        KafkaProducer kafkaProducer
    ) {
        this.postService = postService;
        this.commentService = commentService;
        this.env = env;
        this.kafkaProducer = kafkaProducer;
    }

    ...

    @PostMapping("/rental")
    public ResponseEntity<?> rental(@RequestBody RequestRental postVo) {
        log.info("Post Service's Controller Layer :: Call rental Method!");

        kafkaProducer.send("rental-topic", postVo);

        return ResponseEntity.status(HttpStatus.OK).body(postVo);
    }

    ...
}

예전에 주석처리를 해두었던 rental메서드와 RequestRental클래스를 구현하여 프로듀서 부분을 완성했습니다.

#3 test

제 환경 기준으로 /home/biuea/Desktop/MyRentalKafka/kafka_2.12-2.8.0경로에서 다음의 명령어들로 zookeeper서버와 kafka서버를 구동하도록 하겠습니다.

./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties

그리고 discover-service, config-server, apigateway-service, post-service, rental-service를 구동해주도록 하겠습니다.

잘 구동이 되었다면 rental-service실행 시 콘솔에 다음처럼 kafka연결에 관한 메시지들이 떴을 것입니다.

2021-08-31 11:04:34.570  WARN 66104 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Error while fetching metadata with correlation id 2 : {rental-topic=LEADER_NOT_AVAILABLE}
2021-08-31 11:04:34.573  INFO 66104 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Cluster ID: iSLJTP3tSNy4lOSutcbneQ
2021-08-31 11:04:34.676  WARN 66104 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Error while fetching metadata with correlation id 4 : {rental-topic=LEADER_NOT_AVAILABLE}
2021-08-31 11:04:34.780  WARN 66104 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Error while fetching metadata with correlation id 6 : {rental-topic=LEADER_NOT_AVAILABLE}
2021-08-31 11:04:35.453  INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Discovered group coordinator linux:9092 (id: 2147483647 rack: null)
2021-08-31 11:04:35.456  INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] (Re-)joining group
2021-08-31 11:04:35.504  INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] (Re-)joining group
2021-08-31 11:04:35.529  INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Successfully joined group with generation Generation{generationId=1, memberId='consumer-consumerGroup-1-17923a64-158a-4dc1-bab1-ac42a0483940', protocol='range'}
2021-08-31 11:04:35.533  INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Finished assignment for group at generation 1: {consumer-consumerGroup-1-17923a64-158a-4dc1-bab1-ac42a0483940=Assignment(partitions=[rental-topic-0])}
2021-08-31 11:04:35.616  INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Successfully synced group in generation Generation{generationId=1, memberId='consumer-consumerGroup-1-17923a64-158a-4dc1-bab1-ac42a0483940', protocol='range'}
2021-08-31 11:04:35.616  INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Notifying assignor about the new Assignment(partitions=[rental-topic-0])
2021-08-31 11:04:35.620  INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Adding newly assigned partitions: rental-topic-0
2021-08-31 11:04:35.632  INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Found no committed offset for partition rental-topic-0
2021-08-31 11:04:35.638  INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Found no committed offset for partition rental-topic-0
2021-08-31 11:04:35.653  INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Resetting offset for partition rental-topic-0 to offset 0.
2021-08-31 11:04:35.686  INFO 66104 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : consumerGroup: partitions assigned: [rental-topic-0]

그러면 postman으로 대여에 관한 요청을 post-service에서 메시지를 전달해보도록 하겠습니다.

우선 현재 게시글 중 대여가 가능한 게시글을 찾아보겠습니다.

1번 게시글이 빌려줄게요 타입이니 1번 게시글을 통해 대여를 진행하겠습니다.

메시지 전송은 잘 되었고 rental-service의 콘솔 창을 확인해보겠습니다.

2021-08-31 11:21:51.586  INFO 77950 --- [ntainer#0-0-C-1] c.m.rentalservice.message.KafkaConsumer  : Kafka Message : {"postId":"1","owner":"test-01","borrower":"test-02","price":"10000","startDate":"2021-08-25","endDate":"2021-08-27"}
Hibernate: 
    /* insert com.microservices.rentalservice.entity.RentalEntity
        */ insert 
        into
            rentals
            (borrower, created_at, end_date, owner, post_id, price, rental_id, start_date) 
        values
            (?, ?, ?, ?, ?, ?, ?, ?)

그리고 데이터베이스 화면입니다.

메시지 요청이 잘 전송되었고, 데이터베이스에도 잘 저장된 모습을 볼 수 있습니다. 다음 포스트에서는 post-service로 대여에 대한 반환값을 받아보고 세세하게 카프카를 활용하는 방법, FeignClient를 이용해 auth-service에서 post-service, rental-service의 값을 받아보도록 하겠습니다.

참고
인프런: Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) - 이도원

0개의 댓글

관련 채용 정보