kafka를 적용하기에 앞서 대여 생성에 관한 시나리오를 살펴보도록 하겠습니다.
그러면 이 시나리오를 바탕으로 kafka를 애플리케이션에 적용해보도록 하겠습니다.
우선 컨슈머부터 구현해보도록 하겠습니다. 컨슈머는 메시지 브로커를 구독하고 있다가 구독하고 있는 토픽으로 메시지가 들어온다면 메시지를 읽어 들여옵니다. post-service와 rental-service에서는 구조상 rental-service가 메시지를 받고 대여를 진행해야 하므로 컨슈머의 역할을 합니다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
package com.microservices.rentalservice.message;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
package com.microservices.rentalservice.message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microservices.rentalservice.entity.RentalEntity;
import com.microservices.rentalservice.repository.RentalRepository;
import com.microservices.rentalservice.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Service
@Slf4j
public class KafkaConsumer {
RentalRepository rentalRepository;
@Autowired
public KafkaConsumer(RentalRepository rentalRepository) {
this.rentalRepository = rentalRepository;
}
@KafkaListener(topics="rental-topic")
public void requestRental(String kafkaMessage) {
log.info("Kafka Message : " + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch(JsonProcessingException ex) {
ex.printStackTrace();
}
RentalEntity rentalEntity = RentalEntity.builder()
.rentalId(UUID.randomUUID().toString())
.postId(Long.parseLong((String) map.get("postId")))
.owner((String)map.get("owner"))
.borrower((String)map.get("borrower"))
.price(Long.parseLong((String) map.get("price")))
.startDate((String)map.get("startDate"))
.endDate((String)map.get("endDate"))
.createdAt(DateUtil.dateNow())
.build();
rentalRepository.save(rentalEntity);
}
}
컨슈머 부분에서 대여를 만드는 코드를 만들었고, 이제 프로듀서를 만들어 메시지를 생성해보도록 하겠습니다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
package com.microservices.postservice.message;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
package com.microservices.postservice.message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microservices.postservice.dto.PostDto;
import com.microservices.postservice.vo.RequestRental;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public RequestRental send(
String topic,
RequestRental postVo
) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(postVo);
} catch(JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Post Service: " + postVo);
return postVo;
}
}
package com.microservices.postservice.vo;
import lombok.Getter;
import javax.validation.constraints.NotNull;
@Getter
public class RequestRental {
@NotNull(message="PostId cannot be null")
Long postId;
@NotNull(message="Owner cannot be null")
String owner;
@NotNull(message="Borrower cannot be null")
String borrower;
@NotNull(message="Price cannot be null")
String price;
@NotNull(message="StartDate cannot be null")
String startDate;
@NotNull(message="EndDate cannot be null")
String endDate;
}
package com.microservices.postservice.controller;
import com.microservices.postservice.dto.CommentDto;
import com.microservices.postservice.dto.PostDto;
import com.microservices.postservice.message.KafkaProducer;
import com.microservices.postservice.service.CommentService;
import com.microservices.postservice.service.PostService;
import com.microservices.postservice.vo.RequestCreateComment;
import com.microservices.postservice.vo.RequestRental;
import com.microservices.postservice.vo.RequestWrite;
import com.microservices.postservice.vo.ResponsePost;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
@RestController
@RequestMapping("/")
@Slf4j
public class PostController {
private PostService postService;
private CommentService commentService;
private Environment env;
private KafkaProducer kafkaProducer;
@Autowired
public PostController(
PostService postService,
CommentService commentService,
Environment env,
KafkaProducer kafkaProducer
) {
this.postService = postService;
this.commentService = commentService;
this.env = env;
this.kafkaProducer = kafkaProducer;
}
...
@PostMapping("/rental")
public ResponseEntity<?> rental(@RequestBody RequestRental postVo) {
log.info("Post Service's Controller Layer :: Call rental Method!");
kafkaProducer.send("rental-topic", postVo);
return ResponseEntity.status(HttpStatus.OK).body(postVo);
}
...
}
예전에 주석처리를 해두었던 rental메서드와 RequestRental클래스를 구현하여 프로듀서 부분을 완성했습니다.
제 환경 기준으로 /home/biuea/Desktop/MyRentalKafka/kafka_2.12-2.8.0경로에서 다음의 명령어들로 zookeeper서버와 kafka서버를 구동하도록 하겠습니다.
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
그리고 discover-service, config-server, apigateway-service, post-service, rental-service를 구동해주도록 하겠습니다.
잘 구동이 되었다면 rental-service실행 시 콘솔에 다음처럼 kafka연결에 관한 메시지들이 떴을 것입니다.
2021-08-31 11:04:34.570 WARN 66104 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Error while fetching metadata with correlation id 2 : {rental-topic=LEADER_NOT_AVAILABLE}
2021-08-31 11:04:34.573 INFO 66104 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Cluster ID: iSLJTP3tSNy4lOSutcbneQ
2021-08-31 11:04:34.676 WARN 66104 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Error while fetching metadata with correlation id 4 : {rental-topic=LEADER_NOT_AVAILABLE}
2021-08-31 11:04:34.780 WARN 66104 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Error while fetching metadata with correlation id 6 : {rental-topic=LEADER_NOT_AVAILABLE}
2021-08-31 11:04:35.453 INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Discovered group coordinator linux:9092 (id: 2147483647 rack: null)
2021-08-31 11:04:35.456 INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] (Re-)joining group
2021-08-31 11:04:35.504 INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] (Re-)joining group
2021-08-31 11:04:35.529 INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Successfully joined group with generation Generation{generationId=1, memberId='consumer-consumerGroup-1-17923a64-158a-4dc1-bab1-ac42a0483940', protocol='range'}
2021-08-31 11:04:35.533 INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Finished assignment for group at generation 1: {consumer-consumerGroup-1-17923a64-158a-4dc1-bab1-ac42a0483940=Assignment(partitions=[rental-topic-0])}
2021-08-31 11:04:35.616 INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Successfully synced group in generation Generation{generationId=1, memberId='consumer-consumerGroup-1-17923a64-158a-4dc1-bab1-ac42a0483940', protocol='range'}
2021-08-31 11:04:35.616 INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Notifying assignor about the new Assignment(partitions=[rental-topic-0])
2021-08-31 11:04:35.620 INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Adding newly assigned partitions: rental-topic-0
2021-08-31 11:04:35.632 INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Found no committed offset for partition rental-topic-0
2021-08-31 11:04:35.638 INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Found no committed offset for partition rental-topic-0
2021-08-31 11:04:35.653 INFO 66104 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-consumerGroup-1, groupId=consumerGroup] Resetting offset for partition rental-topic-0 to offset 0.
2021-08-31 11:04:35.686 INFO 66104 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : consumerGroup: partitions assigned: [rental-topic-0]
그러면 postman으로 대여에 관한 요청을 post-service에서 메시지를 전달해보도록 하겠습니다.
우선 현재 게시글 중 대여가 가능한 게시글을 찾아보겠습니다.
1번 게시글이 빌려줄게요 타입이니 1번 게시글을 통해 대여를 진행하겠습니다.
메시지 전송은 잘 되었고 rental-service의 콘솔 창을 확인해보겠습니다.
2021-08-31 11:21:51.586 INFO 77950 --- [ntainer#0-0-C-1] c.m.rentalservice.message.KafkaConsumer : Kafka Message : {"postId":"1","owner":"test-01","borrower":"test-02","price":"10000","startDate":"2021-08-25","endDate":"2021-08-27"}
Hibernate:
/* insert com.microservices.rentalservice.entity.RentalEntity
*/ insert
into
rentals
(borrower, created_at, end_date, owner, post_id, price, rental_id, start_date)
values
(?, ?, ?, ?, ?, ?, ?, ?)
그리고 데이터베이스 화면입니다.
메시지 요청이 잘 전송되었고, 데이터베이스에도 잘 저장된 모습을 볼 수 있습니다. 다음 포스트에서는 post-service로 대여에 대한 반환값을 받아보고 세세하게 카프카를 활용하는 방법, FeignClient를 이용해 auth-service에서 post-service, rental-service의 값을 받아보도록 하겠습니다.
참고
인프런: Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) - 이도원