MSA Phase 8. CQRS

devty·2023년 10월 19일
0

MSA

목록 보기
12/14

서론

기능 구현 후 발생할 수 있는 문제 상황

  • MSA의 태생적인 한계
    • 모든 서비스는 각 프로덕트로서 도메인을 책임진다.
    • 서비스/도메인 간 협럽과 디펜던시가 지양된다.
    • 또한 각 서비스의 DB는 각 서비스를 통해서 접근해야한다. (데이터 오너쉽)
  • 지금까지의 쿼리는 각 서비스내에서 모두 처리가 가능한 정도의 쿼리였다.
  • 하지만 프로덕트가 커지면 커질수록 복잡한 비즈니스 로직이 많아질텐데 모놀로식 아키텍처에서는 한개의 DB에서 Table간 Join이 가능하여 처리가 가능하였지만, MSA에서는 각 서비스 별로 서로 다른 DB를 사용하기에 Query로 Join은 어려움 점이 있다.
    • 이러한 부분을 처리하기 위해 MSA에서는 두가지 방법 사용한다.
      1. API Aggregation Pattern
      2. CQRS Pattern

API Aggregation Pattern

  • API Aggregation Pattern은 여러 서비스로부터 정보를 모아서 하나의 응답으로 제공하는 패턴이다.
  • MSA 환경에서는 서비스 간의 상호작용이 많기 때문에 이러한 패턴이 필요하게 된다.
  • 장점
    • 클라이언트는 여러 서비스로부터 데이터를 요청하는 대신 한 번의 요청으로 필요한 모든 데이터를 얻을 수 있다.
    • 클라이언트의 로직이 단순해진다. 서비스 간의 상호작용이 API 게이트웨이 내에서 수행되기 때문이다.
      • 프론트는 어차피 API 게이트웨이를 통해서 받기에 프론트가 고민해야할 문제는 아니다.
    • 각각의 마이크로서비스는 독립적으로 진화하며, API 게이트웨이만 적절히 업데이트하면 됩니다.
  • 단점
    • API 게이트웨이가 병목 지점이 될 수 있습니다.
      • MSA에 모든 트래픽이 API 게이트웨이를 지나기에 여러 서비스에 대한 요청을 하면 네트워크 지연이 있을 가능성이 높다.
    • 여러 서비스의 응답을 모아 처리하는 로직이 복잡해질 수 있습니다.

CQRS(Command Query Responsibility Segregation) Pattern

  • CQRS는 시스템의 명령(Command) 부분과 조회(Query) 부분을 명확히 분리하는 패턴이다.
  • MSA 환경에서 도메인 복잡성을 관리하고 확장성을 향상시키기 위해 사용된다.
  • 장점
    • 명령과 조회를 분리함으로써 각각의 부분을 독립적으로 확장할 수 있다.
      • MSA에서 지향하는 방식과 너무나도 잘 어울린다.
    • 읽기와 쓰기 모델을 분리함으로써 각 모델에 최적화된 성능 튜닝이 가능한다.
    • 읽기 전용 모델을 따로 설계할 수 있기 때문에, 조회 성능과 구조를 최적화할 수 있다.
    • CQRS는 이벤트 소싱과 잘 결합됩니다. 이로 인해 시스템의 상태 변경 내역을 추적하고 재생할 수 있다.
  • 단점
    • 전통적인 CRUD 패턴보다 구현이 복잡해질 수 있다.
      • CQRS와 EDA가 거의 한몸처럼 움직이는데 두개 모두 구사하는건 어려움이 크다.
    • 명령과 조회 모델을 동기화하는 것이 도전적일 수 있다.

두개 Pattern 중 어떤걸 사용해야 하는가?

  • 둘다 MSA에서 많이 사용하는 패턴들이다.
  • 하지만 각각 사용하는 곳에 대한 느낌이 다른데 제가 생각하기엔 아래와 같을 것 같습니다.
    • API Aggregation
      • 쉽고 직관적인 면이 있어서 팀 단위에서 작업하기가 편하다.
      • 하지만, 서비스간 여러 호출한 상태에서 하나라도 호출이 실패했을 때 롤백하는게 어려울 수 있음. → 트랙잭션 관리 어려움.
    • CQRS
      • API Aggregation에서 부하가 길어질 경우에 CQRS를 많이 사용한다.
      • 하지만, 구현하기 어려울 뿐더러 팀 단위로 모두가 알고 있어야할 이론이 많이에 어렵다.
  • 따라서 모든 선택은 요구사항을 완전히 파악 후에 현재의 상태를 종합적으로 고려해야한다.
    • 여기서 말하는 현재 상태는 팀원들과 프로젝트 기술스택에 따라 다르다.
  • 우리는 둘다 구현을 해보고 시간이 얼마나 걸리며 어느정도 작업 수행을 해야하는지 판단해보자.

본론

예시

  • 예를 들어서 MSA환경에서 밑과 같은 상황이 있다고 가정해 보겠다.
  • 실시간으로 주문한 사람에 대한 해당 구에 총 주문 개수를 알고 싶다면?
  • 주문을 완료 했다면, 그 주문한 유저의 구를 찾아서 해당 구에 대한 전체 주문 개수를 찾는 비즈니스 로직이 있다.
    • MSA에서 순서는 아래와 같다.
      1. 고객이 소속된 “구” 정보 확인
      2. “해당 구” 주소를 가진 고객들 정보 확인
      3. 포함된 모든 유저에 대한 주문 정보 확인
      4. 모든 주문 개수 SUM
  • 이제 위와같은 경우에서 Aggregation, CQRS를 구현해보자.

API Aggregation Pattern Diagram

  • Diagram
    1. 고객이 소속된 “구” 정보 확인(Order-Aggregation-Servie)
    2. “해당 구” 주소를 가진 고객들 정보 확인(User-Servie)
    3. 포함된 모든 유저에 대한 주문 정보 확인(Order-Servie)
    4. 모든 주문 개수 SUM(Order-Aggregation-Servie)
  • 해당 구조로 만들면 되는데 우리는 여기서 Order-Aggregation-Servie 모듈만 새롭게 만들어서 작업을 진행하면 된다.
  • 근데 우리는 생각해야할 부분이 있다.
    • 2번에서 해당 구를 가진 고객이 분명 N명이 나올텐데, N명이 1000명이라면 Order-Service에서 1000번에 쿼리를 날려줘야한다.
    • 해당 구에 포함된 모든 유저(1000명)을 일일이 다 주문한 개수를 카운팅 해줘야하기 때문이다.
  • 이 점을 유심히 고찰한 뒤 API를 만들어보자.

API Aggregation Pattern 구현 전 사전 작업

  1. User-DB에 데이터 생성하기
  2. Product-DB 데이터 생성하기
  3. Order-DB에 데이터 생성하기
  4. User-Service /user/address API 생성하기 → 주문을 완료한 유저에 해당 구를 조회해서 같은 구 유저를 반환 → 이미 존재하는 API라고 가정 하겠다.
  5. Order-Service /order/order_cnt API 생성하기 → 같은 구 유저에 대한 주문 개수 반환 → 이미 존재하는 API라고 가정 하겠다.

Dummy Data 생성하기(User, Product, Order)

  • User
    public class UserDummyDataGenerator {
    
        private static final String DB_URL = "jdbc:mariadb://localhost:3306/user-db";
        private static final String DB_USER = "root";
        private static final String DB_PASSWORD = "taeyun1215";
        private static final String[] ADDRESSES = {"중랑구", "서초구", "강남구"};
        private static final String[] ROLES = {"USER", "ADMIN", "SOCIAL"};
    
        public static void main(String[] args) {
            try {
                Class.forName("org.mariadb.jdbc.Driver");
                Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
                generateDummyUserData(conn);
    
                conn.close();
            } catch (ClassNotFoundException | SQLException e) {
                e.printStackTrace();
            }
        }
    
        private static void generateDummyUserData(Connection conn) throws SQLException {
            String insertQuery = "INSERT INTO user (username, password, nickname, phone, email, address, role) VALUES (?, ?, ?, ?, ?, ?, ?)";
            Random random = new Random();
    
            // 데이터 생성을 위한 PreparedStatement 준비
            try (PreparedStatement pstmt = conn.prepareStatement(insertQuery)) {
                // 더미 데이터 개수 (여기서는 10개로 가정)
                int numberOfDummyData = 1000;
    
                for (int i = 1; i <= numberOfDummyData; i++) {
                    pstmt.setString(1, "user" + i); // username
                    pstmt.setString(2, "pass" + i); // password
                    pstmt.setString(3, "nickname" + i); // nickname
                    pstmt.setString(4, "010-1000-100" + i); // phone
                    pstmt.setString(5, "user" + i + "@example.com"); // email
                    pstmt.setString(6, ADDRESSES[random.nextInt(ADDRESSES.length)]); // address (랜덤하게 선택)
                    pstmt.setString(7, ROLES[random.nextInt(ROLES.length)]);
                    pstmt.executeUpdate();
                }
            }
        }
    }
    • generateDummyUserData → 더미 데이터를 생성하는 메소드이다.
    • inserQuery는 각 요구사항에 맞게 작성하여 진행해주면 된다.
  • 나머지 Product, Order Data도 동일하게 작성해서 처리 하면 된다.

Order-Aggregation-Service

  • 디테일한 부분 말고 크게 설명하겠다. 그리고 클린 아키텍처로 작업이 되어있어서 그 점 참고하면 될 것 같다.
  • OrderCntSumController
    @WebAdapter
    @RestController
    @RequiredArgsConstructor
    @RequestMapping("/orders")
    public class OrderCntSumController {
    
        private final OrderCntSumByAddressUseCase orderCntSumByAddressUseCase;
    
        @PostMapping("/aggregation/get-order-cnt-sum-by-address")
        public ResponseEntity<ReturnObject> OrderCntSumByAddress(
                @RequestBody OrderCntSumByAddressRequest request
        ) {
            OrderCntSumByAddressCommand command = OrderCntSumByAddressCommand.builder()
                    .address(request.getAddress())
                    .build();
    
            long startTime = System.currentTimeMillis();
    
            int orderAmountSum = orderCntSumByAddressUseCase.OrderCntSumByAddress(command);
    
            long endTime = System.currentTimeMillis();
            System.out.println("Order-aggregation-service API Call time: " + (endTime - startTime) + " ms");
    
            ReturnObject returnObject = ReturnObject.builder()
                    .success(true)
                    .data(orderAmountSum)
                    .build();
    
            return ResponseEntity.status(HttpStatus.OK).body(returnObject);
        }
    }
    • 기본적인 Controller이다.
    • OrderCntSumByAddressCommand를 통해 명령을 OrderCntSumByAddressUseCase로 보내게 된다.
    • 시간에 관련된 메소드들을 API 호출 시간을 보기 위함이다.
  • OrderAmountSumByAddressService
    @Slf4j
    @UseCase
    @Transactional
    @RequiredArgsConstructor
    public class OrderAmountSumByAddressService implements OrderCntSumByAddressUseCase {
    
        private final GetUserPort getUserPort;
        private final GetOrderPort getOrderPort;
    
        @Override
        public int OrderCntSumByAddress(OrderCntSumByAddressCommand command) {
    
            List<Long> userIds = getUserPort.getUserIdByAddress(command.getAddress());
    
            List<List<Long>> userPartitionList = null;
            if (userIds.size() > 100) {
                userPartitionList = partitionList(userIds, 100);
            }
    
            List<CompletableFuture<Integer>> futures = new ArrayList<>();
    
            for (List<Long> partitionedList : userPartitionList) {
                CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                    List<Integer> orderCntList = getOrderPort.getOrderCntByUserIds(partitionedList);
                    return orderCntList.stream().mapToInt(Integer::intValue).sum();
                });
                futures.add(future);
            }
    
            int orderCntSum = futures.stream()
                    .map(CompletableFuture::join) 
                    .mapToInt(Integer::intValue)
                    .sum();
    
            return orderCntSum;
        }
    
        private static <T> List<List<T>> partitionList(List<T> list, int partitionSize) {
            return IntStream.range(0, list.size())
                    .boxed()
                    .collect(Collectors.groupingBy(index -> index / partitionSize))
                    .values()
                    .stream()
                    .map(indices -> indices.stream().map(list::get).collect(Collectors.toList()))
                    .collect(Collectors.toList());
        }
    }
    • 기본적인 Service 즉, 비즈니스 로직을 처리하는 부분이다.
    • getUserPort.getUserIdByAddress(command.getAddress()), getOrderPort.getOrderCntByUserIds(partitionedList) 이 두 부분은 밑에서 설명하겠다.
    • 여기서 중점적으로 봐야하는 부분은 partitionList(userIds, 100)여기이다.
      • 왜냐하면 우린 API를 만들기 전에 고찰이 필요하다고 했었다. (Query 1000번 실행)
      • 이 부분을 최대한 줄이기 위해 Chunk Size를 100으로 두어 Query를 1000번 실행하는 것이 아닌 1000/100 = 10번만 실행하는 것으로 최대한 네트워크 지연을 피할수 있다.
      • List 를 n개씩 묶어서 List<List>로 만들어준다.
  • UserServiceAdapter
    @ExternalSystemAdapter
    @RequiredArgsConstructor
    public class UserServiceAdapter implements GetUserPort {
    
        ObjectMapper mapper = new ObjectMapper();
        private final CommonHttpClient commonHttpClient;
        private final String userServiceUrl = "http://localhost:8000/user-service";
    
        @Override
        public List<Long> getUserIdByAddress(String address) {
            String url = String.join("/", userServiceUrl, "users/address", address);
            try {
                String jsonResponse = commonHttpClient.sendGetRequest(url).body();
                Map<String, Object> responseMap = mapper.readValue(jsonResponse, new TypeReference<Map<String, Object>>() {});
                List<Long> userIds = (List<Long>) responseMap.get("data");
    
                return userIds;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
    • 다른 서비스와에 통신을 위한 외부 어댑터 클래스이다.
    • User-Service에서 해당 url API를 통해서 userIds를 가져오게 된다.
    • 동기처리로 구현해두었다.
      • 이 부분을 비동기 처리로 구현을 할까 했는데 어차피 Gateway 서버인 localhost:8000에 병목현상을 초래하는 것은 동일한 느낌이라 비동기로 처리하지 않았다.
  • OrderServiceAdapter
    @ExternalSystemAdapter
    @RequiredArgsConstructor
    public class OrderServiceAdapter implements GetOrderPort {
    
        ObjectMapper mapper = new ObjectMapper();
        private final CommonHttpClient commonHttpClient;
        private final String orderServiceUrl = "http://localhost:8000/order-service";
    
        @Override
        public List<Integer> getOrderCntByUserIds(List<Long> userIds) {
            String url = String.join("/", orderServiceUrl, "orders/order_count");
            try {
                Map<String, List<Long>> payload = Collections.singletonMap("userIds", userIds);
                String jsonPayload = mapper.writeValueAsString(payload);
                Map<String, Object> responseMap = commonHttpClient.sendAsyncPostRequest(url, jsonPayload);
    						List<Integer> orderCntList = (List<Long>) responseMap.get("data");
    
                return orderCntList;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
    • 여기도 마찬가지로 외부 어탭더를 받는 부분이다.
    • 다른 점이 있다면 해당 API는 Post Mapping으로 매개변수를 넣어줘야하는데, 넣어줄 땐 Map으로 변환시켜서 넣어주었다.

Order-Aggregation-Service 결과

  • 이제 모든 데이터와 Aggregation Pattern에 대한 API까지 완성을 하였다.
  • 시간을 측정해보자.
    Order-aggregation-service API Call time: 24412 ms
  • 위 시간을 보면 ms이라 24412/100 = 24.4s가 걸린것으로 파악이 된다.
  • 사실 실시간으로 계속해서 데이터를 받아야하는 API가 20초가 걸린다는 건 말이 안 되긴 한다.
  • 만약 1분 주기로 업데이트가 된다면 괜찮지만, 지금 처럼 실시간으로 받아야하거나 데이터 양이 방대해지면 해당 Pattern으로는 작업이 불가능 할 것 이다.

CQRS Pattern Diagram

  • Diagram
    • 우리는 일단 Axon, EDA에 대해 알고 넘어가야한다.
    • Axon은 명령과 쿼리 책임을 명확하게 분리하며, 이벤트 소싱 및 이벤트 중심 아키텍처를 지원하는 프레임워크이다.
    • EDA(Event-Driven Architecture)는 시스템 내 다양한 구성 요소가 이벤트를 통해 비동기적으로 통신하게 하여, 도메인 이벤트의 발생과 반응을 기반으로 동작한다.
    • COMMAND SIDE
      • Command Gateway: 명령 요청의 진입점이다.
      • Command Handler: 특정 명령을 처리하는 로직이 위치한다.
      • Repository(EventSourcing): 이벤트 소싱을 사용한 저장소로, 도메인 이벤트를 저장한다.
      • Event Handler: 저장된 이벤트를 처리하고 필요한 추가 로직을 실행한다.
      • Write DB: 실제 데이터 변경이 발생하는 쓰기 전용 데이터베이스이다.
    • QUERY SIDE
      • Query Gateway: 쿼리 요청의 진입점이다.
      • Query Handler: 특정 쿼리를 처리하는 로직이 위치한다.
      • Repository(Persistence): 쿼리 요청에 응답하기 위해 데이터를 검색하는 저장소이다.
      • Read DB: 읽기 전용 데이터베이스이다.
    • 둘다 Command Gateway, Query Gateway가 존재하는 이유는 비동기 처리를 위함이다.
    • 실시간으로 주문한 사람에 대한 해당 구에 총 주문 개수를 알고 싶다면? 를 위 이미지처럼 구현하려면 다음과 같다.
      1. 주문을 완료했다면 Command Gateway를 통해 들어오고 Command Handler로 전달됩니다.
      2. Command Handler는 Repository(Event Sourcing)를 통해 이벤트 소싱 방식으로 이벤트를 Write DB에 저장합니다.
      3. 저장된 이벤트는 Event Handler에 의해 처리되며 필요한 경우 Read DB(”행정구”에 대한 총 주문 개수)에 데이터를 갱신할 수 있습니다.
      4. 사용자가 데이터 조회를 요청하면, Query Gateway를 통해 들어오는 요청은 Query Handler로 전달되며, Repository를 통해 Read DB(”행정구”에 대한 총 주문 개수)에서 데이터를 조회하여 응답합니다.

CQRS Pattern 구현 전 사전 작업

  1. 일단 위에서 각 DB에 들어갈 더미 데이터를 생성하였으니 넘어가도록하겠다.
  2. Axon Framework에 대한 기본적인 세팅
  3. Axon Server Docker로 띄어두기

Axon Framework 세팅하기

  • build.gradle
    dependencies {
        // Axon
        implementation 'org.axonframework:axon-spring-boot-starter:4.8.0'
    }
    • CQRS를 사용할 모듈 Build.gradle에 dependencies로 Axon을 추가한다.
  • application.yml
    axon:
      axonserver:
        servers: localhost:8124 # 비워두면 Axon Server에 연결하지 않도록 설정
        enabled: true
      serializer:
        events: jackson
        messages: jackson
        general: xstream
    • Axon Server에 관련된 config를 적어둔다.
    • Axon serializer는 직렬화에 관련된 부분을 적어두었다.
  • Axon Server Docker Image
    • docker pull axoniq/axonserver
      • axonserver docekr image를 내려받는다.
    • docker run -d --name axonserver -p 8024:8024 -p 8124:8124 axoniq/axonserver
      • d는 백그라운드에서 컨테이너를 실행하라는 옵션다.
      • -name axonserver는 컨테이너의 이름을 지정하는 옵션이다.
      • p 8024:8024와 p 8124:8124는 호스트와 컨테이너의 포트를 매핑하는 옵션이다.
      • 8024 포트는 Axon Server의 웹 UI에 접속하기 위한 포트이며, 8124 포트는 애플리케이션과의 통신을 위한 포트이다.

Order-CQRS-Service

  • 실행에 대한 순서대로 코드를 보여주도록하겠다.
  • RegisterOrderService
    @Slf4j
    @UseCase
    @Transactional
    @RequiredArgsConstructor
    public class RegisterOrderService implements RegisterOrderUseCase {
    
        private final SaveOrderItemPort saveOrderItemPort;
    
        @Autowired
        private CommandGateway commandGateway;
    
        @Override
        public void registerOrder(CreateOrderCommand command) {
            Order order = Order.builder()
                    .receiverName(command.getReceiverName())
                    .receiverPhone(command.getReceiverPhone())
                    .receiverAddress(command.getReceiverAddress())
                    .userId(command.getUserId())
                    .orderStatus(OrderStatus.ORDER_CREATED)
                    .build();
    
            Order saveOrder = saveOrderPort.saveOrder(order);
    				
    				// 각종 비즈니스 로직 처리...
    
            command.setOrderId(saveOrder.getOrderId());
            commandGateway.sendAndWait(command);
        }
    }
    • 우리가 위에서 생각했던 Diagram에서 COMMAND SIDE 부분중 가장 첫 부분을 담당한다.
    • Command(명령)을 Command Gateway를 통해 전달하게 된다.
  • OrderAggregate
    @Slf4j
    @Aggregate
    @NoArgsConstructor
    public class OrderAggregate {
    
        @AggregateIdentifier
        private Long orderId;
    
        private OrderStatus orderStatus; // 주문 상태 (ex: CREATED, COMPLETED, CANCELLED)
        private List<OrderCreatedEvent.OrderItemInfo> orderItems;
    
        @CommandHandler
        public OrderAggregate(CreateOrderCommand command) {
            this.orderId = command.getOrderId();
            apply(new OrderCreatedEvent(command.getOrderId(), command.getUserId(), command.getOrderItemInfos()));
        }
    
        @EventSourcingHandler
        public void on(OrderCreatedEvent event) {
            this.orderId = event.getOrderId();
            this.orderItems = event.getOrderItems();
            this.orderStatus = OrderStatus.ORDER_CREATED;
        }
    }
    • 해당 코드는 COMMAND SIDE 부분중 두번째, 세번째를 담당해준다.
    • @Aggregate → Axon에서 해당 어노테이션이 붙은 클래스는 이벤트 소싱 및 CQRS 패턴에서 이벤트를 생성하고 처리하는 역할을 담당한다.
    • @AggregateIdentifier → Aggregate의 유일한 식별자를 나타낸다.
    • Command Gateway를 통해 전달한 Command가 Command Handler를 통해 Command를 처리하고 그 결과로 Event를 발행한다.
    • Event는 apply 메소드를 통해 발행이 되고 Event Sourcing Handler를 통해 Event를 처리하고 Event를 저장한다.
  • OrderRegisterEventHandler
    @Slf4j
    @Component
    public class OrderRegisterEventHandler {
    
        @EventHandler
        public void handler(
                OrderCreatedEvent event,
                GetUserPort getUserPort,
                InsertOrderCntStatePort insertOrderCntStatePort
        ) {
            String address = getUserPort.getAddressByUserId(event.getUserId());
            insertOrderCntStatePort.InsertOrderCntByAddress(address, event.getOrderItems().size());
        }
    }
    • 해당 코드는 COMMAND SIDE 부분중 네번째 담당해준다.
    • 위에서 Event Sourcing Handler를 통해 Event에 상태를 변경했다면 이제는 Event Handler를 통해 비즈니스 로직을 처리한다.
    • getUserPort.getAddressByUserId(event.getUserId()) → 주문한 사람의 userId값을 매개변수값으로 넣어주면 그 사람의 해당하는 구를 반환해준다.
      • 해당 부분은 Aggregation에서도 사용한 기술이니 넘어가도록 하겠다.
    • insertOrderCntStatePort.InsertOrderCntByAddress(address, event.getOrderItems().size()) → Read DB에 변경 사항을 update 해준다.
  • OrderCntSumCommandAdapter
    @PersistenceAdapter
    @RequiredArgsConstructor
    public class OrderCntSumCommandAdapter implements InsertOrderCntStatePort {
    
        private final OrderCntSumByAddressRepo repo;
    
        @Override
        public void InsertOrderCntByAddress(String addressName, int increaserderCnt) {
            OrderCntSumByAddress orderCntSumByAddress = repo.findByAddress(addressName)
                    .orElseGet(() -> createNewRecord(addressName));
    
            orderCntSumByAddress.setOrderCnt(orderCntSumByAddress.getOrderCnt() + increaserderCnt);
            repo.save(orderCntSumByAddress);
        }
    
        private OrderCntSumByAddress createNewRecord(String addressName) {
            OrderCntSumByAddress newRecord = OrderCntSumByAddress.builder()
                    .address(addressName)
                    .orderCnt(0L)
                    .build();
    
            return repo.save(newRecord);
        }
    • 해당 코드는 COMMAND SIDE 부분중 다섯번째 담당해준다.
    • 우리가 해야할 Task는 크게 두가지가 존재하고 있다.
    • Task 1 : 주소에 대해 RDB컬럼이 있는 지 확인하고 없다면 새로운 레코드를 만들기
    • Task 2 : 기존 레코드를 가져와서 연산을 해주고 다시 저장(업데이트)하기
    • 따라서 해당 Task 대로 코드를 짠 것이다.
  • 여기까지가 Event에 대한 처리가 끝났다.
  • 이제는 주문이 완료 될 때마다 Event가 발행되어 해당 구에 총 주문 개수를 계속 업데이트 해줄 것이다.
  • OrderCqrsController
    @WebAdapter
    @RestController
    @RequiredArgsConstructor
    @RequestMapping("/orders")
    public class OrderCqrsController {
    
        private final OrderCntSumUseCase orderCntSumUseCase;
    
        @GetMapping("/cqrs/get-order-cnt-sum-by-address/{address}")
        public ResponseEntity<ReturnObject> OrderCntSumByAddress(
                @PathVariable("address") String address
        ) {
            OrderCntSumByAddressQuery query = OrderCntSumByAddressQuery.builder()
                    .address(address)
                    .build();
    
            long startTime = System.currentTimeMillis();
    
            Long orderCntSumByAddress = orderCntSumUseCase.orderCntSumByAddress(query);
    
            long endTime = System.currentTimeMillis();
            System.out.println("Order-CQRS-Service API Call time: " + (endTime - startTime) + " ms");
    
            ReturnObject returnObject = ReturnObject.builder()
                    .success(true)
                    .data(orderCntSumByAddress)
                    .build();
    
            return ResponseEntity.status(HttpStatus.OK).body(returnObject);
        }
    }
    • 해당 코드는 QUERY SIDE 부분중 첫번째 담당해준다.
    • 간단한 조회용 API이다. 어려운 내용은 없지만 집고 넘어가야할 부분이 있어서 적어보겠다.
    • OrderCntSumByAddressQuery → 클래스 명에 Query를 붙여주면서 해당 클래스가 조회를 하기 위한 클래스라는 것을 의미한다.
    • orderCntSumUseCase → UseCase는 Interface라 클래스 명을 포괄적으로 두었고 클래스 안에 메소드를 조금 디테일하게 두었다.
  • OrderCntSumService
    @Slf4j
    @UseCase
    @Transactional
    @RequiredArgsConstructor
    public class OrderCntSumService implements OrderCntSumUseCase {
    
        private final QueryGateway queryGateway;
    
        @Override
        public Long orderCntSumByAddress(OrderCntSumByAddressQuery query) {
            return queryGateway.query(query, Long.class).join();
        }
    }
    • 해당 코드는 QUERY SIDE 부분중 두번째 담당해준다.
    • 해당 Query를 Query Gateway를 통해 전달한다.
  • OrderCntSumQueryAdapter
    @PersistenceAdapter
    @RequiredArgsConstructor
    public class OrderCntSumQueryAdapter {
    
        private final OrderCntSumByAddressRepo repo;
    
        @QueryHandler
        public Long orderCntSumByAddress(OrderCntSumByAddressQuery query) {
            return repo.findByAddress(query.getAddress())
                    .map(OrderCntSumByAddress::getOrderCnt)
                    .orElse(null);
        }
    }
    • 해당 코드는 QUERY SIDE 부분중 세번째 담당해준다.
    • Query Gateway를 통해 들어온 Query를 Query Handler를 통해 비즈니스 로직을 처리해준다.
    • 우리는 Event를 통해 연산처리를 미리 해두었기에 “해당 구”에 대한 총 주문 개수만 Select 해주면 된다.

Order-CQRS-Service 결과

  • 이제 모든 데이터와 CQRS Pattern에 대한 Event 처리 및 API까지 완성을 하였다.
  • 시간을 측정해보자.
    Order-CQRS-Service API Call time: 926ms
  • 926ms가 나왔다. 즉, 926ms/100 = 0.9s가 나온다.
  • 위에서 보았던 Aggregation Pattern은 24.4s이고 CQRS Pattern은 0.9s 이다.
  • 따라서 CQRS Pattern을 사용함으로써 (24.4s - 0.9s) / 24.4s * 100% = 96.31%에 시간적 효율성을 얻었다.

결론

사용 용도

  • 내가 생각하기에는 두개 사용용도가 매우 뚜렷한 것 같다.
  • 모든 상황에서 무조건 CQRS가 좋은 것도 아니고 무조건 Aggregation이 좋은 것도 아니다.
  • CQRS는 API 호출이 잦거나, 실시간 데이터 반영, 복잡한 데이터 처리가 필요한 환경에서 특히 강점을 발휘한다.
  • Aggregation은 API 호출은 적고 서버간 통신이 있기에 트랜잭션에 중요도가 낮은 비즈니스 로직에서 사용하는 것이 좋은 것 같다.

후기

  • 내가 처음 CQRS에 눈을 뜬건 해당 유튜브 영상이다. 우아한 형제들에서도 MSA 환경으로 바꿈과 동시에 CQRS + EDA를 사용하여 많은 것들을 처리했다고 한다.
  • MSA(전환 일지) + CQRS + EDA에 대해 관심이 있다면 아래 링크를 꼭 다 보길 권한다. www.youtube.com/watch?v=BnS6343GTkY
profile
지나가는 개발자

0개의 댓글