Kafka Spring 활용 [2] Connect 사용

최준호·2022년 3월 19일
0

Microservice Architecture

목록 보기
28/32
post-thumbnail

🔨Maria DB 설정

 create table orders(
     id int auto_increment primary key,
     user_id varchar(50) not null,
     product_id varchar(20) not null,
     order_id varchar(50) not null,
     qty int default 0,
     unit_price int default 0,
     total_price int default 0,
     created_at datetime default now()
);

maria db에 쿼리문을 통해 orders 테이블을 추가한다.

  datasource:
#    driver-class-name: org.h2.Driver
#    url: jdbc:h2:mem:testdb
    url: jdbc:mariadb://localhost:3306/mydb
    driver-class-name: org.mariadb.jdbc.Driver
    username: root
    password: 비밀번호

설정 yml 파일에 다음과 같은 내용으로 변경한다.

의존성에 mariadb 의존성이 추가되어 있을 경우에 가능하다.


그리고 서비스들을 실행시켜 다음과 같이 주문을 생성했다.

maria db에서도 정상적으로 전송된 데이터를 확인할 수 있다.

이건 이제 원래 로직상 정상적으로 추가되던 데이터들이고 kafka를 통해서 진행해보자

🔨Order Service 수정

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      ...
    ],
    "optional": false,
    "name": "users"
  },
  "payload": {
    ...
  }
}

이전에 우리가 kafka에서 db insert가 일어났을 때 다음과 같이 데이터가 전달되었던 것을 확인할 수 있었다.

schema는 db의 table 정보를 fields에 넣어서 전달 했었고 payload는 데이터의 부분을 전달했었는데 이것을 java에서 구현해보자.

👉Field 구현

@Data
@Builder
public class Field {
    private String type;
    private boolean optional;
    private String field;
}

👉Payload 구현

@Data
@Builder
public class Payload {
    private String order_id;
    private String user_id;
    private String product_id;
    private int qty;
    private int unit_price;
    private int total_price;
}

👉Schema 구현

@Data
@Builder
public class Schema {
    private String type;
    private List<Field> fields;
    private boolean optional;
    private String name;
}

👉KafkaOrderDto 구현

@Data
@Builder
public class KafkaOrderDto implements Serializable {
    private Schema schema;
    private Payload payload;
}

위 객체들의 정보를 KafkaOrderDto에 담아서 전달하게 된다면 이전에 우리가 Kafka에서 db insert가 일어났을 때 전달되던 내용이 그대로 사용되는 것을 알수 있다.

👉KafkaOrderProducer 구현

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaOrderProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    List<Field> fields = Arrays.asList(
            Field.builder().type("string").optional(true).field("order_id").build(),
            Field.builder().type("string").optional(true).field("user_id").build(),
            Field.builder().type("string").optional(true).field("product_id").build(),
            Field.builder().type("int32").optional(true).field("qty").build(),
            Field.builder().type("int32").optional(true).field("unit_price").build(),
            Field.builder().type("int32").optional(true).field("total_price").build()
    );

    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();

    public OrderDto orderSend(String topic, OrderDto orderDto){

        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build();

        KafkaOrderDto kafkaOrderDto = KafkaOrderDto.builder()
                .schema(schema)
                .payload(payload)
                .build();

        ObjectMapper mapper = new ObjectMapper();
        //json format으로 변경
        String json = "";
        try {
            json = mapper.writeValueAsString(kafkaOrderDto);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        //kafka 메세지 전송
        kafkaTemplate.send(topic, json);
        log.info("Kafka Producer send data from the order service = {}",kafkaOrderDto);

        return orderDto;
    }
}

이전에 Producer로 Kafka send를 실행했었는데 해당 로직을 데이터가 잘 들어갈 수 있게 코드를 작성했다.

👉Order Controller 수정

@RestController
@RequestMapping("/order-service")
@RequiredArgsConstructor
public class OrderController {
    private final Environment env;
    private final OrderService orderService;
    private final KafkaProducer kafkaProducer;  //kafka producer 주입
    private final KafkaOrderProducer kafkaOrderProducer;    //주문 전송 producer 주입

	...

    @PostMapping("/{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder requestOrder){
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        //기존의 jpa 로직
//        OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
//        orderDto.setUserId(userId);
//        OrderDto createOrder = orderService.createOrder(orderDto);
//        ResponseOrder responseOrder = mapper.map(createOrder, ResponseOrder.class);
        OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
        orderDto.setUserId(userId);
        //kafka 주문 로직 추가
        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(requestOrder.getQty() * requestOrder.getUnitPrice());

        //kafka 메세지 전송
        kafkaProducer.orderSend("example-catalog-topic", orderDto);

        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
        kafkaOrderProducer.orderSend("orders", orderDto);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }
}

여기서 중요한것은 기존의 jpa를 사용하여 데이터를 직접 db에 저장시키는 로직을 주석처리한다는 점이다.

OrderController에서도 jpa로직 대신 kafka에 데이터를 전송하는 로직만 추가한다.

👉Kafka Sink Connect 등록

{
    "name":"my-order-sink-connect",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"비밀번호",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"orders"
    }
}

다음과 같이 sink connect를 등록해준다.

👏Test 해보기

👉Consumer 실행해두기

./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning

명령어로 Kafka Consumer를 하나 띄워서 메세지가 전달되는 것을 확인하게 할 수 있고

👉Order Service 2개 띄우기

다음과 같이 order-service를 2개 이상 띄워둔다.

Ereka에서도 확인이 가능하다.

이렇게 데이터를 전송하고 확인을 위해 값들을 바꾸어 여러번 전송해봤다.

👊db 확인하기

그동안 전송된 데이터들이 모두 잘 입력된 것을 확인할 수 있다.

혹시라도 전송은 성공적으로 전달이 되었는데 db에 아무런 반응이 없다면 kafka server의 모든 서버를 재실행 해보길 바란다. message 자체는 사라지지 않아서 데이터 손실은 없으니 걱정하지말고 서버를 껐다 키면 데이터가 모두 적용되어져 있을것이다.

profile
해당 주소로 이전하였습니다. 감사합니다. https://ililil9482.tistory.com

1개의 댓글

comment-user-thumbnail
2022년 3월 20일

영화관 알바하시던 시절부터 남몰래 흠모하면서 오랫동안 지켜봐왔습니다.. 팬이에요... JPA관련해서 업데이트 하실 생각은 없으신지요

답글 달기