[MSA] Axon Framework CQRS, API Aggregation

Welcome to Seoyun Dev Log·2024년 11월 29일
0
  • Aggregate(command model): 어그리게이트는 상태를 관리하고 명령(command)을 처리하여 이벤트를 생성한다.
  • Default 생성자는 Axon에서 리플렉션을 통해 Aggregate를 초기화할 때 필요하다.

동작

Command: 명령이 Aggregate에서 처리되며, 이벤트가 발행.
Event: 이벤트가 저장소(Event Store)에 저장되고 Query Model로 전파.
Query: Query Model은 이벤트를 기반으로 데이터를 읽기 데이터베이스에 저장하여 최적화된 읽기 작업을 수행.

command 클래스: 쓰기 작업 요청 정의

package com.yun.money.adapter.axon.command;

import com.yun.common.SelfValidating;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import org.axonframework.modelling.command.TargetAggregateIdentifier;

@Getter
@ToString
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@EqualsAndHashCode(callSuper = false)
public class MemberMoneyCreateCommand extends SelfValidating<MemberMoneyCreateCommand> {
    @NotNull
    @NotEmpty
    @TargetAggregateIdentifier // 명령이 어느 Aggregate에 전달될지 식별
    private String membershipId;

    public MemberMoneyCreateCommand(String membershipId) {
        this.membershipId = membershipId;
        this.validateSelf();
    }
}

Aggregate 클래스

1. Aggregate 클래스: 명령 처리를 위한 클래스
@Aggregate 명시

2. @AggregateIdentifier: ID 필드에 Aggregate의 고유 식별자 (Aggregate ID)
private String id;

3. 명령 핸들러: CreateOrderCommand를 처리
@CommandHandler 명시
public MemberMoneyAggregate(MemberMoneyCreateCommand command) {
    //명령 실행시 command가 존재하는지 유효성 검사를 할 수 있다.
    log.info("MemberMoneyAddCommand Handler");
	//이벤트 발행 (MemberMoneyCreateEvent)
    apply(new MemberMoneyCreateEvent(id, command.getMembershipId()));
}

4. 이벤트 소싱 핸들러: MemberMoneyCreateEvent를 처리하여 상태를 업데이트
@EventSourcingHandler
public void on(MemberMoneyCreateEvent event) {
    log.info("MemberMoneyAddEvent Sourcing Handler");
    id = UUID.randomUUID().toString();
    membershipId = event.getMembershipId();
    balance = 0;
}

5. 다음 명령 핸들러: UpdateOrderStatusCommand를 처리
@CommandHandler
public void handle(UpdateOrderStatusCommand command) {
    // 상태 변경 가능 여부를 검증
    if (!"CREATED".equals(this.status)) {
            throw new IllegalStateException("Order cannot be updated in current state");
    }
    // 이벤트를 발행 (OrderStatusUpdatedEvent)
    apply(new OrderStatusUpdatedEvent(command.getOrderId(), command.getNewStatus()));
}

6. 이벤트 소싱 핸들러: OrderStatusUpdatedEvent를 처리하여 상태를 업데이트
@EventSourcingHandler
public void on(OrderStatusUpdatedEvent event) {
    this.status = event.getNewStatus();
}

Event 클래스 : 상태 변경의 결과

package com.yun.money.adapter.axon.event;

import com.yun.common.SelfValidating;
import com.yun.money.adapter.axon.command.MemberMoneyCreateCommand;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Getter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class MemberMoneyCreateEvent extends SelfValidating<MemberMoneyCreateCommand> {
    private String aggregateIdentifier;
    private String membershipId;
}

Query Model

Query Model은 이벤트를 구독하고 읽기 전용 데이터베이스 업데이트.

package com.yun.moneyqueryservice.adapter.in.axon;

import com.yun.moneyqueryservice.application.port.out.GetMemberAddressInfoPort;
import com.yun.moneyqueryservice.application.port.out.InsertMoneyIncreaseEventByAddress;
import com.yun.moneyqueryservice.application.port.out.MemberAddressInfo;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MoneyAdjustEventHandler {
    //⭐️RequestFirmbankingFinishedEvent를 구독하여 읽기 모델 업데이트
    @EventHandler
    public void handler(RequestFirmbankingFinishedEvent event
            , GetMemberAddressInfoPort getMemberAddressInfoPort
            , InsertMoneyIncreaseEventByAddress insertMoneyIncreaseEventByAddress) {
        log.info("money adjust event receiced : {}", event.toString());

        //고객 주소 정보
        MemberAddressInfo memberAddressInfo = getMemberAddressInfoPort.getMemberAddressInfo(event.getMembershipId());

        //⭐️dynamoDB insert : 읽기 데이터베이스에 주문 저장
        String address = memberAddressInfo.address();
        int moneyAmount = event.getMoneyAmount();
        log.info("dynamo insert: {}, {}", address, moneyAmount);
        insertMoneyIncreaseEventByAddress.insertMoneyIncreaseEventByAddress(address, moneyAmount);
    }
}

DynamoDBAdapter 읽기 전용 DB

package com.yun.moneyqueryservice.adapter.out.aws.dynamodb;

import com.yun.moneyqueryservice.application.port.out.GetMemberAddressInfoPort;
import com.yun.moneyqueryservice.application.port.out.InsertMoneyIncreaseEventByAddress;
import com.yun.moneyqueryservice.application.port.out.MemberAddressInfo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@RequiredArgsConstructor
@Component
public class DynamoDBAdapter implements GetMemberAddressInfoPort, InsertMoneyIncreaseEventByAddress {
    @Value("money.query.table")
    private String TABLE_NAME;
    @Value("money.dynamo.access")
    private String ACCESS_KEY;
    @Value("money.dynamo.secret")
    private String SECRET_KEY;
    private final DynamoDbClient dynamoDbClient;
    private final MoneySumByAddressMapper mapper;

    public DynamoDBAdapter() {
        this.mapper = new MoneySumByAddressMapper();
        AwsBasicCredentials awsBasicCredentials = AwsBasicCredentials.create(ACCESS_KEY, SECRET_KEY);
        this.dynamoDbClient = DynamoDbClient.builder()
                .region(Region.AP_NORTHEAST_2)
                .credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials))
                .build();
    }

    /*@Override
    public void insertMoneyIncreaseEventByAddress(String addressName, int moneyIncrease) {
        //1. raw event insert
        String pk = addressName + "#" + LocalDateTime.now();
        int sk = moneyIncrease;
        putItem(pk, sk);

        //2. 지역 정보 잔액 증가
        //2-1. 지역별/일별 정보
        String summaryPk = pk + "summary";
        int summarySk = -1;
        MoneySumByAddress item = getItem(summaryPk, summarySk);
        if (item == null) {
            putItem(summaryPk, summarySk, moneyIncrease);
        }

        //합산 TODO: domain으로 분리
        int balance = item.getBalance();
        balance += moneyIncrease;
        //update

        //2-2. 지역별 정보
        String addressPk = addressName;
        int addressSk = -1;
        MoneySumByAddress addressItem = getItem(addressPk, addressSk);
        if (addressItem == null) {
            putItem(addressPk, addressSk, moneyIncrease);
        }
        //합산 TODO: domain으로 분리
        int addressItemBalance = addressItem.getBalance();
        addressItemBalance += moneyIncrease;
        putItem(addressPk, addressSk, addressItemBalance);
    }*/

    @Override
    public void insertMoneyIncreaseEventByAddress(String addressName, int moneyIncrease) {
        // 3개의 일을 해야될 것이에요.

        // 1. raw event insert (Insert, put)
        // PK: 강남구#230728 SK: 5,000, balance, 5,000
        String pk = addressName + "#" + "230728";
        int sk = moneyIncrease;
        putItem(pk, sk, moneyIncrease);

        // 2. 지역 정보 잔액 증가시켜야 해요. (Query, Update)
        // 2-1. 지역별/일별 정보
        //  - PK: 강남구#230728#summary SK: -1 balance: + 5,000
        String summaryPk = pk + "#summary";
        int summarySk = -1;
        MoneySumByAddress moneySumByAddress = getItem(summaryPk, summarySk);
        if (moneySumByAddress == null) {
            putItem(summaryPk, summarySk, moneyIncrease);
        } else{
            int balance = moneySumByAddress.getBalance();
            balance += moneyIncrease;
            updateItem(summaryPk, summarySk, balance);
        }

        // 2-2. 지역별 정보
        // - PK: 강남구 SK: -1 balance: + 5,000
        String summaryPk2 = addressName;
        int summarySk2 = -1;
        MoneySumByAddress moneySumByAddress2 = getItem(summaryPk2, summarySk2);
        if (moneySumByAddress2 == null) {
            putItem(summaryPk2, summarySk2, moneyIncrease);
        } else{
            int balance2 = moneySumByAddress2.getBalance();
            balance2 += moneyIncrease;
            updateItem(summaryPk2, summarySk2, balance2);
        }
    }

    @Override
    public MemberAddressInfo getMemberAddressInfo(String membershipId) {
        return null;
    }

    private MoneySumByAddress getItem(String pk, int sk) {
        try {
            HashMap<String, AttributeValue> attrMap = new HashMap<>();
            attrMap.put("PK", AttributeValue.builder().s(pk).build());
            attrMap.put("SK", AttributeValue.builder().n(Integer.toString(sk)).build());

            GetItemRequest request = GetItemRequest.builder()
                    .tableName(TABLE_NAME)
                    .key(attrMap)
                    .build();

            GetItemResponse response = dynamoDbClient.getItem(request);

            if (response.hasItem()) {
                mapper.mapToMoneySumByAddress(response.item());
            }

        } catch (DynamoDbException e) {
            log.error("Error getting an item from the table: {} ", e.getMessage());
        }
        return null;
    }

    private void putItem(String pk, int sk) {
        try {
            HashMap<String, AttributeValue> attrMap = new HashMap<>();
            attrMap.put("PK", AttributeValue.builder().s(pk).build());
            attrMap.put("SK", AttributeValue.builder().s(Integer.toString(sk)).build());

            PutItemRequest request = PutItemRequest.builder()
                    .tableName(TABLE_NAME)
                    .item(attrMap)
                    .build();

            dynamoDbClient.putItem(request);
        } catch (DynamoDbException e) {
            log.error("Error adding an item to the table: {} ", e.getMessage());
        }
    }

    private void putItem(String pk, int sk, int balance) {
        try {
            HashMap<String, AttributeValue> attrMap = new HashMap<>();
            attrMap.put("PK", AttributeValue.builder().s(pk).build());
            attrMap.put("SK", AttributeValue.builder().s(Integer.toString(sk)).build());
            attrMap.put("Balance", AttributeValue.builder().n(Integer.toString(balance)).build());

            PutItemRequest request = PutItemRequest.builder()
                    .tableName(TABLE_NAME)
                    .item(attrMap)
                    .build();

            dynamoDbClient.putItem(request);
        } catch (DynamoDbException e) {
            log.error("Error adding an item to the table: {} ", e.getMessage());
        }
    }

    private void queryItem(String pk) {
        try {
            // PK 만 써도 돼요.
            HashMap<String, Condition> attrMap = new HashMap<>();
            attrMap.put("PK", Condition.builder()
                    .attributeValueList(AttributeValue.builder().s(pk).build())
                    .comparisonOperator(ComparisonOperator.EQ)
                    .build());

            QueryRequest request = QueryRequest.builder()
                    .tableName(TABLE_NAME)
                    .keyConditions(attrMap)
                    .build();

            QueryResponse response = dynamoDbClient.query(request);
            response.items().forEach((value) -> log.info("value: {}", value));
        } catch (DynamoDbException e) {
            log.error("Error getting an item from the table: {} ", e.getMessage());
        }
    }

    private void updateItem(String pk, int sk, int balance) {
        try {
            HashMap<String, AttributeValue> attrMap = new HashMap<>();
            attrMap.put("PK", AttributeValue.builder().s(pk).build());
            attrMap.put("SK", AttributeValue.builder().s(Integer.toString(sk)).build());

            String balanceStr = String.valueOf(balance);
            // Create an UpdateItemRequest
            UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                    .tableName(TABLE_NAME)
                    .key(attrMap)
                    .attributeUpdates(
                            new HashMap<String, AttributeValueUpdate>() {{
                                put("balance", AttributeValueUpdate.builder()
                                        .value(AttributeValue.builder().n(balanceStr).build())
                                        .action(AttributeAction.PUT)
                                        .build());
                            }}
                    ).build();


            UpdateItemResponse response = dynamoDbClient.updateItem(updateItemRequest);

            // 결과 출력.
            Map<String, AttributeValue> attributes = response.attributes();
            if (attributes != null) {
                for (Map.Entry<String, AttributeValue> entry : attributes.entrySet()) {
                    String attributeName = entry.getKey();
                    AttributeValue attributeValue = entry.getValue();
                    System.out.println(attributeName + ": " + attributeValue);
                }
            } else {
                System.out.println("Item was updated, but no attributes were returned.");
            }
        } catch (DynamoDbException e) {
            System.err.println("Error getting an item from the table: " + e.getMessage());
        }
    }
}

CQRS

command(데이터변동)과 query(데이터 조회)를 분리함으로써 비즈니스 로직이 포함된 API를 구현하는 패턴이다.

API Aggregation

클라이언트와 서버 간 통신의 효율성을 높이기 위한 설계로 여러 API 호출을 하나의 API로 집계(aggregation)하여 클라이언트가 한 번의 호출로 필요한 데이터를 얻을 수 있도록 하는 설계 방식이며 주로 API Gateway나 Aggregator 서비스를 통해 구현된다.

  • 여러 마이크로서비스로 분리된 데이터를 통합하여 한 번에 제공.
  • 클라이언트가 여러 개의 API를 직접 호출하지 않도록 함.
  • 응답 시간이 느려질 수 있음(여러 API 호출로 인해).
  • Aggregator API가 병목이 될 가능성.

단점으로
요청한 데이터가 많을수록 해당 서비스에서 많은 API를 호출받게 될 것인데 이때 DB 부하 측면에서 괜찮다고해도 응답 시간이 느려질 수 있다.이를 해결하기 위해 CQRS 패턴 도입

API Aggregation + CQRS

CQRS 기반의 시스템에서 API Aggregation을 통해 클라이언트 응답 성능을 최적화

profile
하루 일지 보단 행동 고찰 과정에 대한 개발 블로그

0개의 댓글

관련 채용 정보