OrderService가 여러개 실행되었을때 각각의 H2 DB에 따로 데이터가 저장되는 일이 발생할 수 있다. 이때 하나의 DB로 단일화 시킴으로써 해결할 수 있다.
implementation 'org.mariadb.jdbc:mariadb-java-client:2.7.8'
implementation 'mysql:mysql-connector-java:8.0.29'
spring:
application:
name: order-service
h2:
console:
enabled: true
settings:
web-allow-others: true
path: /h2-console
jpa:
hibernate:
ddl-auto: update
datasource:
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mysql://localhost/mydb
username: [userName]
password: [passWord]
@RestController
@Slf4j
@RequestMapping("/order-service")
public class OrderController {
private Environment env;
private OrderService orderService;
private KafkaProducer kafkaProducer;
private OrderProducer orderProducer;
public OrderController(Environment env,
OrderService orderService,
KafkaProducer kafkaProducer,
OrderProducer orderProducer) {
this.env = env;
this.orderService = orderService;
this.kafkaProducer = kafkaProducer;
this.orderProducer = orderProducer;
}
...
// http://127.0.0.1:0/order-service/{user_id}/orders/
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder orderDetails) {
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
/* JPA */
// OrderDto createdOrder = orderService.createOrder(orderDto);
// ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
/* Kafka */
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(orderDetails.getUnitPrice() * orderDetails.getQty());
/* send this order to the kafka */
kafkaProducer.send("example-catalog-topic", orderDto);
orderProducer.send("orders", orderDto);
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
...
}
// Field.java
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
// Schema.java
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
// Payload.java
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}
// KafkaOrderDto.java
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
@Service // 스프링에서 빈으로 등록될 클래스임을 선언
@Slf4j // lombok 어노테이션으로, Logger 객체를 자동 생성해준다.
public class OrderProducer {
private KafkaTemplate<String, String> kafkaTemplate; // 카프카 프로듀서 객체
// 카프카 메시지의 스키마를 정의하기 위한 필드 리스트
List<Field> fields = Arrays.asList(
new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price")
);
// 스키마 객체 생성
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
// KafkaTemplate 객체를 주입받는 생성자
@Autowired
public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// 카프카 토픽에 메시지를 보내는 메소드
public OrderDto send(String topic, OrderDto orderDto) {
// Payload 객체 생성
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
// 카프카 메시지로 전송될 KafkaOrderDto 객체 생성
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
// Jackson ObjectMapper 객체 생성
ObjectMapper mapper = new ObjectMapper();
String jsonInstring = "";
try {
// KafkaOrderDto 객체를 JSON 문자열로 변환
jsonInstring = mapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
// 카프카 템플릿을 사용하여 메시지를 보내고, 로그 출력
kafkaTemplate.send(topic, jsonInstring);
log.info("Order Producer sent data from the Order microservice: " + kafkaOrderDto);
// OrderDto 객체 반환
return orderDto;
}
}
POST
http://127.0.0.1:8083/connectors
// JSON 형태
{
"name" : "my-order-sink-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"DBusername",
"connection.password":"DBpassword",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"orders"
}
}