order-service 에서 주문을 하게 되면 catalog-service 에서는 주문 수량이 그만큼 감소를 해야한다.
order-service -> Meessage Queuing Service(Kafka) -> catalog-service
// Kafka 사용
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String,String> consumerFactory() {
Map<String,Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 서버 주소
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); // 여러 가지 컨슈머가 있을 경우 groupId 로 구분
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
// 위에서 만든 Factory 등록
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
// KafkaConsumer (@Service)
// 해당 토픽에 데이터가 전달되면 그 데이터가 값을 가지고 와서 updateQty 가 실행된다.
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message: ->" + kafkaMessage);
Map<Object , Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
// Kafka Message 는 String 형태로 들어오지만 그것을 Json 으로 변경하기 위한 로직
// 변경하다가 예외가 발생할 수 있기 때문에 try catch 로 잡아준다.
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
CatalogEntity entity = repository.findByProductId((String) map.get("productId"));
// Null 체크를 하는것도 중요하다.
// if (entity != null) ...
if (entity != null) {
entity.setStock(entity.getStock() - (Integer) map.get("qty"));
repository.save(entity);
}
}
// Kafka 사용
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String,String> producerFactory() {
Map<String,Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 서버 주소
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String , String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
// KafkaProducer.class
// topic 에 order 의 정보를 보내는 method
public OrderDto send(String topic , OrderDto orderDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
// json format 으로 변경
jsonInString = mapper.writeValueAsString(orderDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic , jsonInString);
log.info("kafka Producer sent data from the Order microservice : " + orderDto);
return orderDto;
}
모든 서버 및 서비스를 기동한 후 POSTMAN 으로 order-service 에서 order 를 만든다.
만약 orderService 가 1개가 아니라 여러개의 orderService 를 기동하는 경우에는 어떻게 Kafka 를 사용해야하는지 알아보자.
order-service 를 2개 기동 (A,B)
이때 더 큰 문제는 user-service 에서 user-service/users/{uesrId} 를 요청하면 한 번은 A-service 로 가서 3개의 order 데이터를 받고 그 다음은 B-service 로 가서 2개의 order 데이터를 받게 된다.
order-service 의 JPA 데이터베이스 교체
spring:
jpa:
hibernate:
ddl-auto: update
datasource:
url: jdbc:mysql://localhost:3306/mydb
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password:
// order-service.controller.createOrder() 일부분
OrderDto createdOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = modelMapper.map(createdOrder , ResponseOrder.class);
위에 코드는 Order 객체를 만들고 DB 에 저장을 한 다음에 ResponseOrder 로 반환시키는 부분이다.
이부분을 Kafka 에다가 메시지를 보내도록 바꾼다.
우리가 가지고 있던 주문 정보를 어떻게 topic 에 보낼지가 중요하다.
그래서 topic 에 보내기 위해서는 위와 같은 format 을 맞춰야한다.
그런후 OrderProducer 를 생성한다. (@Service)
order-serivce 를 위해서 kafka sink connector 를 기동하고 order-service 를 위한 connector 를 추가해야한다.
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
@Service
@Slf4j
public class KafkaOrderProducer {
private KafkaTemplate<String,String> kafkaTemplate;
List<Field> fields = Arrays.asList(
new Field("string",true,"order_id"),
new Field("string",true,"user_id"),
new Field("string",true,"product_id"),
new Field("int32",true,"qty"),
new Field("int32",true,"unit_price"),
new Field("int32",true,"total_price")
);
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
// topic 에 order 의 정보를 보내는 method
public OrderDto send(String topic , OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.total_price(orderDto.getTotalPrice())
.user_id(orderDto.getUserId())
.unit_price(orderDto.getUnitPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema,payload);
// Schema , Payload 가 담긴 메시지를 만들어야 한다.
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
// json format 으로 변경
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic , jsonInString);
return orderDto;
}
}
{
"name":"my-order-sink-connector",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"orders"
}
}
이제 다시 order-service 를 2개 기동해본다.
POSTMAN 에서 /order 를 실행해보면 order-service 2개를 번갈아가면서 호출하는 것을 확인했다.