Spring Cloud + MSA 애플리케이션 개발 11(데이터 동기화를 위한 Kafka 2)

지원·2024년 2월 29일
0

MSA개발

목록 보기
11/15

OrderService , CatalogsService 에 Kafka 적용

order-service 에서 주문을 하게 되면 catalog-service 에서는 주문 수량이 그만큼 감소를 해야한다.

  • 기본적인 Front-Back 방식에서는 공통의 DB 를 사용기 때문에 Table 을 공유해서 상관이 없다.
  • 하지만 Microservice 를 사용하면서 우리는 독립적인 DB 를 가지도록 했기 때문에 order-service 에서는 주문을 했지만 catalog-service 에는 상품 수량에 대한 관리가 이루어지지 않는다.
  • 이러한 문제를 해결하기 위해서 Kafka 를 통해 데이터 동기화를 하려고 한다.
  1. order-serivce 에 요청된 주문의 수량 정보를 catalogs-service 에 반영
  2. oder-service 에서 kafka Topic 으로 메시지 전송 -> Producer
  3. caltalog-service 에서 Kafka Topic 에 전송 된 메시지 취득 -> Consumer

order-service -> Meessage Queuing Service(Kafka) -> catalog-service

Consumer (catalogs-service)

  1. spring kafka 의존성 추가
  2. KafkaConsumerConfig 생성
  • KafkaConsumerConfig 에는 Topic 에 접속하기 위해 필요한 정보들을 빈으로 등록
    -> 데이터를 지정해줄 때 토픽에 저장되는 값 자체가 어떠한 형태로 등록되는지 정해줘야 한다.
    -> 우리는 JSON 형식으로 등록되기 때문에 Key , Value 가 한 세트가 토픽에 저장되어 있을 때 그 값을 가지고 와서 사용하도록 한다.
    -> 예제에서는 String 으로 들어가기 때문에 String Deserializer.class 를 등록
  • 토픽에 어떤 변경사항이 있을 때 그 값을 캐치할 수 있는 리스너도 등록
  1. KafkaConsumer 라는 @Service 를 등록
    -> 토픽에 변경 된 데이터 값을 가지고 와서 실제 DB 에 반영 해야한다.
    -> 그 로직이 있는 method 에 @KafkaListener 를 붙혀주면 된다. (@KafkaListner(topcis="order-topic")
    -> 현재 예제에서는 주문한 상품id , 주문 수량이 넘어올텐데 id 를 가지고 상품을 찾고 현재 상품의 개수에서 주문 수량만큼 빼주면 데이터 동기화가 된다.

Producer (order-service)

  1. spring kafka 의존성 추가
  2. KafkaProducerConfig 생성
    -> 카프카 토픽에 데이터를 보내기 위해서 KafkaTemplate 을 등록
  3. KafkaProducer 생성 @Service
  4. topic 에 전달 할 때 kafkaTemplate 을 통해 보내는데, JSON 형태로 보내야하기 때문에 변환을 하고 보내면 된다.
  • kafkaTemplate.send() 를 통해서 보내면 된다.
  • 정해져있는 타입에 맞춰서 보내야한다. (전에 봤었던 JSON 포맷)
  1. order-service 에서 createOrder() 부분에 원래 Order 객체를 만드는 부분만 있었는데 여기에 topic 에 보내는 코드까지 포함시키면 된다.

Catalogs Microservice 수정

  • 우선 spring-kafka 의존성 추가
// Kafka 사용
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String,String> consumerFactory() {
        Map<String,Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 서버 주소
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); // 여러 가지 컨슈머가 있을 경우 groupId 로 구분
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(properties);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();
        // 위에서 만든 Factory 등록
        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        return kafkaListenerContainerFactory;
    }
}
  • @EnableKafka 를 등록해주면서 Kafka 를 사용한다고 알려준다.
  • topic 에 접근하기 위한 정보들을 consumerFactory() 를 Bean 으로 등록
  • DESERIALIZER (역직렬화) -> 받는 쪽이기 때문에 역직렬화 사용
  • 그렇게 만든 consumerFactory() 를 KafkaListenerContainerFactory 에 넣어준다.
// KafkaConsumer (@Service)

// 해당 토픽에 데이터가 전달되면 그 데이터가 값을 가지고 와서 updateQty 가 실행된다.
    @KafkaListener(topics = "example-catalog-topic")
    public void updateQty(String kafkaMessage) {
        log.info("Kafka Message: ->" + kafkaMessage);

        Map<Object , Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        // Kafka Message 는 String 형태로 들어오지만 그것을 Json 으로 변경하기 위한 로직
        // 변경하다가 예외가 발생할 수 있기 때문에 try catch 로 잡아준다.
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        CatalogEntity entity = repository.findByProductId((String) map.get("productId"));
        // Null 체크를 하는것도 중요하다.
        // if (entity != null) ...

        if (entity != null) {
            entity.setStock(entity.getStock() - (Integer) map.get("qty"));
            repository.save(entity);
        }
    }
  • topic 을 example-catalog-topic 으로 할 예정이기 때문에 @KafkaListener 인자에 넣어준다.
  • KafkaMessage 는 String 으로 넘어오기 때문에 해당 값을 JSON 으로 바꾸고 그 map 객체에 담는다.
  • map 객체에서 productId , qty 를 가지고 데이터 동기화를 해주면 된다.

Order Microservice 수정

  • 똑같이 먼저 spring-kafka 의존성 추가
// Kafka 사용
@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String,String> producerFactory() {
        Map<String,Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // 서버 주소
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String , String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • SERIALIZER (직렬화) -> 먼저 보내는 쪽이기 때문에 직렬화
  • KafkaTemplate 등록할 때 위에서 만든 Factory 전달
    -> topic 에 접속할 수 있는 정보가 필요하기 떄문
// KafkaProducer.class

// topic 에 order 의 정보를 보내는 method
    public OrderDto send(String topic , OrderDto orderDto) {
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            // json format 으로 변경
            jsonInString = mapper.writeValueAsString(orderDto);
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic , jsonInString);
        log.info("kafka Producer sent data from the Order microservice : " + orderDto);
        return orderDto;
    }
  • kafkaTemplate.send() 를 통해 보내면 된다.

Kafka 를 활용한 데이터 동기화 테스트 1

  1. 주키퍼 서버 기동
  2. 카프카 서버 기동
  3. 유레카 서버 기동
  4. config-service 기동
  5. apigateway-service 기동
  6. order-serivce 기동
  7. caltalog-service 기동

모든 서버 및 서비스를 기동한 후 POSTMAN 으로 order-service 에서 order 를 만든다.

  • 그런후에 catalog-service 에서 h2 DB 에 들어가보면 order 한 수량 만큼 감소한것을 확인했다.
  • 데이터 동기화가 되는 것이다.

만약 orderService 가 1개가 아니라 여러개의 orderService 를 기동하는 경우에는 어떻게 Kafka 를 사용해야하는지 알아보자.

Multi Order Microservice 사용에 대한 데이터 동기화 문제

  • order-service 를 2개 기동
  • User 의 요청 분산 처리
  • Order 데이터도 분산 저장 -> 동기화 문제

order-service 를 2개 기동 (A,B)

  • order-service 에서 5번 order 를 한다고 가정해보자.
  • 그렇다면 A-service 로 3번 , B-service 로 2번의 order 데이터가 이동한다.
  • 이 이유는 전에 배운 내용이다.

이때 더 큰 문제는 user-service 에서 user-service/users/{uesrId} 를 요청하면 한 번은 A-service 로 가서 3개의 order 데이터를 받고 그 다음은 B-service 로 가서 2개의 order 데이터를 받게 된다.

  • 이러한 문제를 해결하기 위해서는 Kafka Connect 를 활용하여 단일 데이터베이스를 사용해야한다.

Kafka Connect 를 활용한 단일 데이터베이스를 사용

  • OrderService 에 요청된 주문 정보를 DB 가 아니라 Kafka Topic 으로 전송
  • Kafka Topic 에 설정된 Kafka Sink Connect 를 사용해 단일 DB에 저장 -> 데이터동기화

order-service 의 JPA 데이터베이스 교체

  • H2 -> MariaDB 로 바꾸기
  • 바꾼 후 orders table 을 생성해야한다.

Orders Microservice 수정 (DB 변경)

spring:
  jpa:
    hibernate:
      ddl-auto: update
  datasource:
    url: jdbc:mysql://localhost:3306/mydb
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: 
  • order-service.application.yml 에서 위와 같이 변경하면 Mysql 로 변경할 수 있다.
  • mydb 에 접속해서 create table orders 로 orders 라는 테이블도 만들었다.

Orders Microservice 수정 (Orders Kafka Topic)

  • 아래에 있는 order-service 의 Controller 에서 createOrder() 부분을 수정해야한다.
// order-service.controller.createOrder() 일부분
OrderDto createdOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = modelMapper.map(createdOrder , ResponseOrder.class);

위에 코드는 Order 객체를 만들고 DB 에 저장을 한 다음에 ResponseOrder 로 반환시키는 부분이다.

이부분을 Kafka 에다가 메시지를 보내도록 바꾼다.

  • orderProducer 를 구현하고 orderProducer.send() 를 사용하면 된다.

우리가 가지고 있던 주문 정보를 어떻게 topic 에 보낼지가 중요하다.

  • 왜냐하면 topic 에 쌓였던 메시지들은 Sink Connector 에 의해서 불려지고 그 Sink Connector 가 토픽에 있었던 메시지의 내용들을 열어보고 어떻게 저장되어 있는지 파악을 gkrh JDBC Connector 에 저장한다.
  • 이때 만약 일정한 format 대로 저장되어 있지 않으면 정상적으로 전달되지 않는다.

그래서 topic 에 보내기 위해서는 위와 같은 format 을 맞춰야한다.

  • Schema : type , fields , optional , name
  • fields : type , optional , field
  • payload : order_id , user_id , product_id , qty , total_price , unit_price
  • Schema , Payload 라는 클래스를 만들고 KafkaOrderDto 라는 곳에 필드로 선언하면 된다.

그런후 OrderProducer 를 생성한다. (@Service)

  • 여기에 위에서 만든 Schema , List< Field> fields 를 필드에 넣는다.
  • kafkaTemplate 을 사용해서 topic 에 전달하고 싶은 내용을 보낼 수 있다.
  • 그런후 order-service 에서 Payload 는 OrderDto 로 값을 채워서 만들고 schema 는 OrderProcuder 에서 초기화할 때 넣은 값들을 사용한다.
  • 그렇게 KafkaOrderDto 에 넣고 Dto 를 생성한 후 mapper.wirteValueAsString 으로 객체를 JSON 으로 만든다.
  • 똑같이 kafkaTemplate.send() 로 보낼 수 있다.

order-serivce 를 위해서 kafka sink connector 를 기동하고 order-service 를 위한 connector 를 추가해야한다.

  • 즉 order-service 들을 데이터를 kafka topic 에 던지고 topic 에 있는 메시지를 sink connector 가 가지고 와서 단일 데이터베이스에 저장을 시켜줄 수 있다.
  • topics 에 orders 로 해야하는데 이게 table 이름이다.

Order Microservice 수정 (Order Kafka Producer)

  • Schema , Field , Payload 클래스를 만들고 format 에 맞는 필드를 선언한다.
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
    private Schema schema;
    private Payload payload;
}
  • KafkaOrderDto 가 Schema , Payload 를 가지고 있다.
  • Schema 안에 List 형태의 Fields 를 가진다.
@Service
@Slf4j
public class KafkaOrderProducer {

    private KafkaTemplate<String,String> kafkaTemplate;

    List<Field> fields = Arrays.asList(
            new Field("string",true,"order_id"),
            new Field("string",true,"user_id"),
            new Field("string",true,"product_id"),
            new Field("int32",true,"qty"),
            new Field("int32",true,"unit_price"),
            new Field("int32",true,"total_price")
    );

    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();

    // topic 에 order 의 정보를 보내는 method
    public OrderDto send(String topic , OrderDto orderDto) {

        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .total_price(orderDto.getTotalPrice())
                .user_id(orderDto.getUserId())
                .unit_price(orderDto.getUnitPrice())
                .build();

        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema,payload);
        // Schema , Payload 가 담긴 메시지를 만들어야 한다.
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            // json format 으로 변경
            jsonInString = mapper.writeValueAsString(kafkaOrderDto);
        } catch (JsonProcessingException ex) {
            ex.printStackTrace();
        }

        kafkaTemplate.send(topic , jsonInString);
        return orderDto;
    }
}
  • KafkaOrderProducer 에서 fields 와 Schema 에 대한 정보를 초기화를 하고 send() 에서 Payload 값을 OrderDto 를 통해서 채운다.
  • schema , payload 를 통해 KafkaOrderDto 를 만들고 그 Dto 를 JSON 으로 바꿔서 kafkaTemplate.send() 에 담아서 보내면 된다.
  • KafkaOrderDto 는 해당 코드 위에 있는 클래스이다.
  • 이렇게 우리는 orders 라는 topic 에 JSON 형태로 데이터를 던진 상황이고 이때 Sink Connector 를 하나 등록하게 되면 topic 에 있는 데이터를 읽어와서 단일 DB 에 저장하면 된다.

Sink Connector 등록

{
    "name":"my-order-sink-connector",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"orders"
    }
}
  • name 은 my-order-sink-connector 로 생성
  • topics 에 orders 로 지정
  • 127.0.0.1:8083/connectors POST 방식으로 넘기면 된다.
  • 127.0.0.1:8083/connectors GET 방식으로 생성된 Connector 를 확인할 수 있다.

Kafka 를 활용한 데이터 동기화 테스트 2

이제 다시 order-service 를 2개 기동해본다.

  • 다시 order-service/{userId}/orders endpoint 로 주문을 요청한다.
  • kafka-console-consumer 를 기동하면 어떠한 값들이 들어왔는지 확인할 수 있다.
  • 1번째 , 2번째 주문은 order-serviceA 에서 실행
  • 3번쨰 주문은 order-serviceB 에서 실행
  • 하나의 DB 에 값이 저장되는 것을 확인

POSTMAN 에서 /order 를 실행해보면 order-service 2개를 번갈아가면서 호출하는 것을 확인했다.

  • 하지만 Mysql(MariaDB) 에 들어가서 select * from orders 쿼리를 실행해보면 모든 order 가 나오는 것을 알 수 있다.
  • 그런후 user-service/user/{userId} GET 으로 해당 user 의 주문 내역을 가지고 와보면 모든 주문이 다 들어와있는 것을 알 수 있다.
  • 단일 DB 를 사용하기 때문에 모든 주문을 확인할 수 있다.

참고자료

profile
덕업일치

0개의 댓글