rental-service는 대여를 담당하는 서비스입니다. 게시글을 통해 상품을 확인하고 마음에 드는 상품이 존재한다면 댓글, 채팅 서비스를 이용해 유저 간 대화를 나누고, 게시글의 대여 버튼을 이용해 대여를 합니다.
대여의 흐름은 다음과 같습니다. 1) 대여 버튼 클릭 -> 2) 게시글의 작성자는 마이 페이지에서 대여 요청 확인 -> 3) 대여 승낙 -> 4) 대여 완료
그러면 이러한 흐름이 서비스 단위에서는 어떻게 이루어지는지 살펴 보겠습니다.
1) post-service의 user-b가 작성자인 post로 대여를 요청합니다.
2) 만약 user-b가 요청을 수락한다면 kafka message queue에 메시지를 발행합니다.
3) rental-service는 이 kafka를 구독하고 있고, 메시지가 큐에 들어온다면 메시지를 가져옵니다.
4) 가져온 메시지를 바탕으로 대여 관련 트랜잭션을 수행합니다.
5) 데이터베이스에 데이터가 정상적으로 들어와있다면, Feign client를 이용해 user-a는 요청한 대여에 관한 데이터를 확인할 수 있습니다.
대략적으로 대여에 관한 흐름을 살펴보았으니 이전에 설치한 kafka를 이용하여 post-service와 rental-service 간 통신을 진행해보겠습니다.
post-service에 kafka 디펜던시를 추가하도록 하겠습니다.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
post-service -> rental-service로 메시지가 전달이 되기 때문에 post-service는 Producer, rental-service는 Consumer의 역할을 가지게 됩니다.
우선 Producer부분부터 작성하도록 하겠습니다.
package com.microservices.postservice.message;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package com.microservices.postservice.message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microservices.postservice.dto.PostDto;
import com.microservices.postservice.vo.RequestRental;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public RequestRental send(
String topic,
RequestRental postVo
) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(postVo);
} catch(JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Post Service: " + postVo);
return postVo;
}
}
대여를 요청하는 메세지에 대해 작성을 했습니다. Config에서는 Json으로 이루어진 데이터를 String화시키고, 이를 이용하여 send 메서드에서 topic, 데이터를 담아 메시지 큐로 전송합니다.
이 producer를 컨트롤러에서 /post-service/rental 요청 시 호출하고 동시에 관련 post의 status도 업데이트하도록 하겠습니다.
@PostMapping("/rental")
public ResponseEntity<?> rental(@RequestBody RequestRental postVo) {
log.info("Post Service's Controller Layer :: Call rental Method!");
kafkaProducer.send("rental-topic", postVo);
postService.rental(postVo.getPostId());
return ResponseEntity.status(HttpStatus.OK).body(postVo);
}
@Transactional
@Override
public void rental(Long id) {
log.info("Post Service's Service Layer :: Call rental Method!");
PostEntity entity = postRepository.findPostById(id);
entity.setStatus(PostStatus.COMPLETE_RENTAL.name());
postRepository.save(entity);
}
이어서 rental-service에서 Consumer를 만들도록 하겠습니다.
package com.microservices.rentalservice.message;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
package com.microservices.rentalservice.message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microservices.rentalservice.entity.RentalEntity;
import com.microservices.rentalservice.repository.RentalRepository;
import com.microservices.rentalservice.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Service
@Slf4j
public class KafkaConsumer {
RentalRepository rentalRepository;
@Autowired
public KafkaConsumer(RentalRepository rentalRepository) {
this.rentalRepository = rentalRepository;
}
@KafkaListener(topics="rental-topic")
public void requestRental(String kafkaMessage) {
log.info("Kafka Message : " + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch(JsonProcessingException ex) {
ex.printStackTrace();
}
RentalEntity rentalEntity = RentalEntity.builder()
.rentalId(UUID.randomUUID().toString())
.postId(Long.parseLong(String.valueOf(map.get("postId"))))
.owner((String)map.get("owner"))
.borrower((String)map.get("borrower"))
.price(Long.parseLong(String.valueOf(map.get("price"))))
.startDate((String)map.get("startDate"))
.endDate((String)map.get("endDate"))
.createdAt(DateUtil.dateNow())
.build();
rentalRepository.save(rentalEntity);
}
}
Consumer는 post-service에서 전송된 토픽을 메시지 큐를 구독하고 있다가 토픽에 메시지가 들어오면 이 메시지를 받아와 그 안에 있는 데이터를 deserialize시켜 데이터베이스에 저장하는 흐름을 가지고 있습니다.
이렇게 kafka를 각 서비스에 연동을 해보았고 실제로 테스트를 진행해보도록 하겠습니다.
kafka, zookeeper 서버를 실행하도록 하겠습니다.
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
현재 post-service 데이터베이스의 데이터입니다. 이 중 id가 4번인 데이터를 대상으로 대여를 테스트해보겠습니다.
예상되는 결과는 id가 4인 게시글의 상태가 COMPLETE_RENTAL로 수정되며, 대여 데이터 생성입니다.
요청 결과 COMPLETE_RENTAL로 잘 수정된 모습입니다.
대여 데이터도 확인해보겠습니다.
bbb라는 borrower가 asd 물품을 잘 대여한 모습을 확인할 수 있습니다.
kafka를 이용하여 메시지 기반의 통신을 해보았습니다. 다음 포스트에서는 좀 더 디테일하게 대여를 구현하도록 하겠습니다.