create table orders(
id int auto_increment primary key,
user_id varchar(50) not null,
product_id varchar(20) not null,
order_id varchar(50) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
created_at datetime default now()
);
maria db에 쿼리문을 통해 orders 테이블을 추가한다.
datasource:
# driver-class-name: org.h2.Driver
# url: jdbc:h2:mem:testdb
url: jdbc:mariadb://localhost:3306/mydb
driver-class-name: org.mariadb.jdbc.Driver
username: root
password: 비밀번호
설정 yml 파일에 다음과 같은 내용으로 변경한다.
의존성에 mariadb 의존성이 추가되어 있을 경우에 가능하다.
그리고 서비스들을 실행시켜 다음과 같이 주문을 생성했다.
maria db에서도 정상적으로 전송된 데이터를 확인할 수 있다.
이건 이제 원래 로직상 정상적으로 추가되던 데이터들이고 kafka를 통해서 진행해보자
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
...
],
"optional": false,
"name": "users"
},
"payload": {
...
}
}
이전에 우리가 kafka에서 db insert가 일어났을 때 다음과 같이 데이터가 전달되었던 것을 확인할 수 있었다.
schema
는 db의 table 정보를 fields에 넣어서 전달 했었고 payload
는 데이터의 부분을 전달했었는데 이것을 java에서 구현해보자.
@Data
@Builder
public class Field {
private String type;
private boolean optional;
private String field;
}
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
@Data
@Builder
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
위 객체들의 정보를 KafkaOrderDto에 담아서 전달하게 된다면 이전에 우리가 Kafka에서 db insert가 일어났을 때 전달되던 내용이 그대로 사용되는 것을 알수 있다.
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaOrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(
Field.builder().type("string").optional(true).field("order_id").build(),
Field.builder().type("string").optional(true).field("user_id").build(),
Field.builder().type("string").optional(true).field("product_id").build(),
Field.builder().type("int32").optional(true).field("qty").build(),
Field.builder().type("int32").optional(true).field("unit_price").build(),
Field.builder().type("int32").optional(true).field("total_price").build()
);
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
public OrderDto orderSend(String topic, OrderDto orderDto){
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = KafkaOrderDto.builder()
.schema(schema)
.payload(payload)
.build();
ObjectMapper mapper = new ObjectMapper();
//json format으로 변경
String json = "";
try {
json = mapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//kafka 메세지 전송
kafkaTemplate.send(topic, json);
log.info("Kafka Producer send data from the order service = {}",kafkaOrderDto);
return orderDto;
}
}
이전에 Producer로 Kafka send를 실행했었는데 해당 로직을 데이터가 잘 들어갈 수 있게 코드를 작성했다.
@RestController
@RequestMapping("/order-service")
@RequiredArgsConstructor
public class OrderController {
private final Environment env;
private final OrderService orderService;
private final KafkaProducer kafkaProducer; //kafka producer 주입
private final KafkaOrderProducer kafkaOrderProducer; //주문 전송 producer 주입
...
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder){
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
//기존의 jpa 로직
// OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
// orderDto.setUserId(userId);
// OrderDto createOrder = orderService.createOrder(orderDto);
// ResponseOrder responseOrder = mapper.map(createOrder, ResponseOrder.class);
OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
orderDto.setUserId(userId);
//kafka 주문 로직 추가
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(requestOrder.getQty() * requestOrder.getUnitPrice());
//kafka 메세지 전송
kafkaProducer.orderSend("example-catalog-topic", orderDto);
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
kafkaOrderProducer.orderSend("orders", orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
}
여기서 중요한것은 기존의 jpa를 사용하여 데이터를 직접 db에 저장시키는 로직을 주석처리한다는 점이다.
OrderController
에서도 jpa로직 대신 kafka에 데이터를 전송하는 로직만 추가한다.
{
"name":"my-order-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"비밀번호",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"orders"
}
}
다음과 같이 sink connect를 등록해준다.
./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning
명령어로 Kafka Consumer를 하나 띄워서 메세지가 전달되는 것을 확인하게 할 수 있고
다음과 같이 order-service
를 2개 이상 띄워둔다.
Ereka에서도 확인이 가능하다.
이렇게 데이터를 전송하고 확인을 위해 값들을 바꾸어 여러번 전송해봤다.
그동안 전송된 데이터들이 모두 잘 입력된 것을 확인할 수 있다.
혹시라도 전송은 성공적으로 전달이 되었는데 db에 아무런 반응이 없다면 kafka server의 모든 서버를 재실행 해보길 바란다. message 자체는 사라지지 않아서 데이터 손실은 없으니 걱정하지말고 서버를 껐다 키면 데이터가 모두 적용되어져 있을것이다.
영화관 알바하시던 시절부터 남몰래 흠모하면서 오랫동안 지켜봐왔습니다.. 팬이에요... JPA관련해서 업데이트 하실 생각은 없으신지요