Rental Application (React & Spring boot Microservice) - 36: 대여 서비스(1)

yellow_note·2021년 10월 7일
0

#1

rental-service는 대여를 담당하는 서비스입니다. 게시글을 통해 상품을 확인하고 마음에 드는 상품이 존재한다면 댓글, 채팅 서비스를 이용해 유저 간 대화를 나누고, 게시글의 대여 버튼을 이용해 대여를 합니다.

대여의 흐름은 다음과 같습니다. 1) 대여 버튼 클릭 -> 2) 게시글의 작성자는 마이 페이지에서 대여 요청 확인 -> 3) 대여 승낙 -> 4) 대여 완료

그러면 이러한 흐름이 서비스 단위에서는 어떻게 이루어지는지 살펴 보겠습니다.

1) post-service의 user-b가 작성자인 post로 대여를 요청합니다.

2) 만약 user-b가 요청을 수락한다면 kafka message queue에 메시지를 발행합니다.

3) rental-service는 이 kafka를 구독하고 있고, 메시지가 큐에 들어온다면 메시지를 가져옵니다.

4) 가져온 메시지를 바탕으로 대여 관련 트랜잭션을 수행합니다.

5) 데이터베이스에 데이터가 정상적으로 들어와있다면, Feign client를 이용해 user-a는 요청한 대여에 관한 데이터를 확인할 수 있습니다.

대략적으로 대여에 관한 흐름을 살펴보았으니 이전에 설치한 kafka를 이용하여 post-service와 rental-service 간 통신을 진행해보겠습니다.

#2 kafka 연동

post-service에 kafka 디펜던시를 추가하도록 하겠습니다.

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

post-service -> rental-service로 메시지가 전달이 되기 때문에 post-service는 Producer, rental-service는 Consumer의 역할을 가지게 됩니다.

우선 Producer부분부터 작성하도록 하겠습니다.

  • ./message/KafkaProducerConfig
package com.microservices.postservice.message;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • ./message/KafkaProducer
package com.microservices.postservice.message;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microservices.postservice.dto.PostDto;
import com.microservices.postservice.vo.RequestRental;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaProducer {
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public RequestRental send(
        String topic,
        RequestRental postVo
    ) {
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString  = "";

        try {
            jsonInString = mapper.writeValueAsString(postVo);
        } catch(JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic, jsonInString);

        log.info("Kafka Producer sent data from the Post Service: " + postVo);

        return postVo;
    }
}

대여를 요청하는 메세지에 대해 작성을 했습니다. Config에서는 Json으로 이루어진 데이터를 String화시키고, 이를 이용하여 send 메서드에서 topic, 데이터를 담아 메시지 큐로 전송합니다.

이 producer를 컨트롤러에서 /post-service/rental 요청 시 호출하고 동시에 관련 post의 status도 업데이트하도록 하겠습니다.

  • ./controller/PostController
@PostMapping("/rental")
public ResponseEntity<?> rental(@RequestBody RequestRental postVo) {
    log.info("Post Service's Controller Layer :: Call rental Method!");

    kafkaProducer.send("rental-topic", postVo);

    postService.rental(postVo.getPostId());

    return ResponseEntity.status(HttpStatus.OK).body(postVo);
}
  • ./service/PostServiceImpl
@Transactional
@Override
public void rental(Long id) {
    log.info("Post Service's Service Layer :: Call rental Method!");

    PostEntity entity = postRepository.findPostById(id);

    entity.setStatus(PostStatus.COMPLETE_RENTAL.name());

    postRepository.save(entity);
}

이어서 rental-service에서 Consumer를 만들도록 하겠습니다.

  • ./message/KafkaConsumerConfig
package com.microservices.rentalservice.message;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();

        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    }
}
  • ./message/KafkaConsumer
package com.microservices.rentalservice.message;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microservices.rentalservice.entity.RentalEntity;
import com.microservices.rentalservice.repository.RentalRepository;
import com.microservices.rentalservice.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Service
@Slf4j
public class KafkaConsumer {
    RentalRepository rentalRepository;

    @Autowired
    public KafkaConsumer(RentalRepository rentalRepository) {
        this.rentalRepository = rentalRepository;
    }

    @KafkaListener(topics="rental-topic")
    public void requestRental(String kafkaMessage) {
        log.info("Kafka Message : " + kafkaMessage);

        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();

        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch(JsonProcessingException ex) {
            ex.printStackTrace();
        }

        RentalEntity rentalEntity = RentalEntity.builder()
                                                .rentalId(UUID.randomUUID().toString())
                                                .postId(Long.parseLong(String.valueOf(map.get("postId"))))
                                                .owner((String)map.get("owner"))
                                                .borrower((String)map.get("borrower"))
                                                .price(Long.parseLong(String.valueOf(map.get("price"))))
                                                .startDate((String)map.get("startDate"))
                                                .endDate((String)map.get("endDate"))
                                                .createdAt(DateUtil.dateNow())
                                                .build();

        rentalRepository.save(rentalEntity);
    }
}

Consumer는 post-service에서 전송된 토픽을 메시지 큐를 구독하고 있다가 토픽에 메시지가 들어오면 이 메시지를 받아와 그 안에 있는 데이터를 deserialize시켜 데이터베이스에 저장하는 흐름을 가지고 있습니다.

이렇게 kafka를 각 서비스에 연동을 해보았고 실제로 테스트를 진행해보도록 하겠습니다.

#3 테스트

kafka, zookeeper 서버를 실행하도록 하겠습니다.

./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties

현재 post-service 데이터베이스의 데이터입니다. 이 중 id가 4번인 데이터를 대상으로 대여를 테스트해보겠습니다.

예상되는 결과는 id가 4인 게시글의 상태가 COMPLETE_RENTAL로 수정되며, 대여 데이터 생성입니다.

요청 결과 COMPLETE_RENTAL로 잘 수정된 모습입니다.

대여 데이터도 확인해보겠습니다.

bbb라는 borrower가 asd 물품을 잘 대여한 모습을 확인할 수 있습니다.

kafka를 이용하여 메시지 기반의 통신을 해보았습니다. 다음 포스트에서는 좀 더 디테일하게 대여를 구현하도록 하겠습니다.

0개의 댓글