Command: 명령이 Aggregate에서 처리되며, 이벤트가 발행.
Event: 이벤트가 저장소(Event Store)에 저장되고 Query Model로 전파.
Query: Query Model은 이벤트를 기반으로 데이터를 읽기 데이터베이스에 저장하여 최적화된 읽기 작업을 수행.
package com.yun.money.adapter.axon.command;
import com.yun.common.SelfValidating;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import org.axonframework.modelling.command.TargetAggregateIdentifier;
@Getter
@ToString
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@EqualsAndHashCode(callSuper = false)
public class MemberMoneyCreateCommand extends SelfValidating<MemberMoneyCreateCommand> {
@NotNull
@NotEmpty
@TargetAggregateIdentifier // 명령이 어느 Aggregate에 전달될지 식별
private String membershipId;
public MemberMoneyCreateCommand(String membershipId) {
this.membershipId = membershipId;
this.validateSelf();
}
}
1. Aggregate 클래스: 명령 처리를 위한 클래스
@Aggregate 명시
2. @AggregateIdentifier: ID 필드에 Aggregate의 고유 식별자 (Aggregate ID)
private String id;
3. 명령 핸들러: CreateOrderCommand를 처리
@CommandHandler 명시
public MemberMoneyAggregate(MemberMoneyCreateCommand command) {
//명령 실행시 command가 존재하는지 유효성 검사를 할 수 있다.
log.info("MemberMoneyAddCommand Handler");
//이벤트 발행 (MemberMoneyCreateEvent)
apply(new MemberMoneyCreateEvent(id, command.getMembershipId()));
}
4. 이벤트 소싱 핸들러: MemberMoneyCreateEvent를 처리하여 상태를 업데이트
@EventSourcingHandler
public void on(MemberMoneyCreateEvent event) {
log.info("MemberMoneyAddEvent Sourcing Handler");
id = UUID.randomUUID().toString();
membershipId = event.getMembershipId();
balance = 0;
}
5. 다음 명령 핸들러: UpdateOrderStatusCommand를 처리
@CommandHandler
public void handle(UpdateOrderStatusCommand command) {
// 상태 변경 가능 여부를 검증
if (!"CREATED".equals(this.status)) {
throw new IllegalStateException("Order cannot be updated in current state");
}
// 이벤트를 발행 (OrderStatusUpdatedEvent)
apply(new OrderStatusUpdatedEvent(command.getOrderId(), command.getNewStatus()));
}
6. 이벤트 소싱 핸들러: OrderStatusUpdatedEvent를 처리하여 상태를 업데이트
@EventSourcingHandler
public void on(OrderStatusUpdatedEvent event) {
this.status = event.getNewStatus();
}
package com.yun.money.adapter.axon.event;
import com.yun.common.SelfValidating;
import com.yun.money.adapter.axon.command.MemberMoneyCreateCommand;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
@Getter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class MemberMoneyCreateEvent extends SelfValidating<MemberMoneyCreateCommand> {
private String aggregateIdentifier;
private String membershipId;
}
Query Model은 이벤트를 구독하고 읽기 전용 데이터베이스 업데이트.
package com.yun.moneyqueryservice.adapter.in.axon;
import com.yun.moneyqueryservice.application.port.out.GetMemberAddressInfoPort;
import com.yun.moneyqueryservice.application.port.out.InsertMoneyIncreaseEventByAddress;
import com.yun.moneyqueryservice.application.port.out.MemberAddressInfo;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MoneyAdjustEventHandler {
//⭐️RequestFirmbankingFinishedEvent를 구독하여 읽기 모델 업데이트
@EventHandler
public void handler(RequestFirmbankingFinishedEvent event
, GetMemberAddressInfoPort getMemberAddressInfoPort
, InsertMoneyIncreaseEventByAddress insertMoneyIncreaseEventByAddress) {
log.info("money adjust event receiced : {}", event.toString());
//고객 주소 정보
MemberAddressInfo memberAddressInfo = getMemberAddressInfoPort.getMemberAddressInfo(event.getMembershipId());
//⭐️dynamoDB insert : 읽기 데이터베이스에 주문 저장
String address = memberAddressInfo.address();
int moneyAmount = event.getMoneyAmount();
log.info("dynamo insert: {}, {}", address, moneyAmount);
insertMoneyIncreaseEventByAddress.insertMoneyIncreaseEventByAddress(address, moneyAmount);
}
}
DynamoDBAdapter 읽기 전용 DB
package com.yun.moneyqueryservice.adapter.out.aws.dynamodb;
import com.yun.moneyqueryservice.application.port.out.GetMemberAddressInfoPort;
import com.yun.moneyqueryservice.application.port.out.InsertMoneyIncreaseEventByAddress;
import com.yun.moneyqueryservice.application.port.out.MemberAddressInfo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RequiredArgsConstructor
@Component
public class DynamoDBAdapter implements GetMemberAddressInfoPort, InsertMoneyIncreaseEventByAddress {
@Value("money.query.table")
private String TABLE_NAME;
@Value("money.dynamo.access")
private String ACCESS_KEY;
@Value("money.dynamo.secret")
private String SECRET_KEY;
private final DynamoDbClient dynamoDbClient;
private final MoneySumByAddressMapper mapper;
public DynamoDBAdapter() {
this.mapper = new MoneySumByAddressMapper();
AwsBasicCredentials awsBasicCredentials = AwsBasicCredentials.create(ACCESS_KEY, SECRET_KEY);
this.dynamoDbClient = DynamoDbClient.builder()
.region(Region.AP_NORTHEAST_2)
.credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials))
.build();
}
/*@Override
public void insertMoneyIncreaseEventByAddress(String addressName, int moneyIncrease) {
//1. raw event insert
String pk = addressName + "#" + LocalDateTime.now();
int sk = moneyIncrease;
putItem(pk, sk);
//2. 지역 정보 잔액 증가
//2-1. 지역별/일별 정보
String summaryPk = pk + "summary";
int summarySk = -1;
MoneySumByAddress item = getItem(summaryPk, summarySk);
if (item == null) {
putItem(summaryPk, summarySk, moneyIncrease);
}
//합산 TODO: domain으로 분리
int balance = item.getBalance();
balance += moneyIncrease;
//update
//2-2. 지역별 정보
String addressPk = addressName;
int addressSk = -1;
MoneySumByAddress addressItem = getItem(addressPk, addressSk);
if (addressItem == null) {
putItem(addressPk, addressSk, moneyIncrease);
}
//합산 TODO: domain으로 분리
int addressItemBalance = addressItem.getBalance();
addressItemBalance += moneyIncrease;
putItem(addressPk, addressSk, addressItemBalance);
}*/
@Override
public void insertMoneyIncreaseEventByAddress(String addressName, int moneyIncrease) {
// 3개의 일을 해야될 것이에요.
// 1. raw event insert (Insert, put)
// PK: 강남구#230728 SK: 5,000, balance, 5,000
String pk = addressName + "#" + "230728";
int sk = moneyIncrease;
putItem(pk, sk, moneyIncrease);
// 2. 지역 정보 잔액 증가시켜야 해요. (Query, Update)
// 2-1. 지역별/일별 정보
// - PK: 강남구#230728#summary SK: -1 balance: + 5,000
String summaryPk = pk + "#summary";
int summarySk = -1;
MoneySumByAddress moneySumByAddress = getItem(summaryPk, summarySk);
if (moneySumByAddress == null) {
putItem(summaryPk, summarySk, moneyIncrease);
} else{
int balance = moneySumByAddress.getBalance();
balance += moneyIncrease;
updateItem(summaryPk, summarySk, balance);
}
// 2-2. 지역별 정보
// - PK: 강남구 SK: -1 balance: + 5,000
String summaryPk2 = addressName;
int summarySk2 = -1;
MoneySumByAddress moneySumByAddress2 = getItem(summaryPk2, summarySk2);
if (moneySumByAddress2 == null) {
putItem(summaryPk2, summarySk2, moneyIncrease);
} else{
int balance2 = moneySumByAddress2.getBalance();
balance2 += moneyIncrease;
updateItem(summaryPk2, summarySk2, balance2);
}
}
@Override
public MemberAddressInfo getMemberAddressInfo(String membershipId) {
return null;
}
private MoneySumByAddress getItem(String pk, int sk) {
try {
HashMap<String, AttributeValue> attrMap = new HashMap<>();
attrMap.put("PK", AttributeValue.builder().s(pk).build());
attrMap.put("SK", AttributeValue.builder().n(Integer.toString(sk)).build());
GetItemRequest request = GetItemRequest.builder()
.tableName(TABLE_NAME)
.key(attrMap)
.build();
GetItemResponse response = dynamoDbClient.getItem(request);
if (response.hasItem()) {
mapper.mapToMoneySumByAddress(response.item());
}
} catch (DynamoDbException e) {
log.error("Error getting an item from the table: {} ", e.getMessage());
}
return null;
}
private void putItem(String pk, int sk) {
try {
HashMap<String, AttributeValue> attrMap = new HashMap<>();
attrMap.put("PK", AttributeValue.builder().s(pk).build());
attrMap.put("SK", AttributeValue.builder().s(Integer.toString(sk)).build());
PutItemRequest request = PutItemRequest.builder()
.tableName(TABLE_NAME)
.item(attrMap)
.build();
dynamoDbClient.putItem(request);
} catch (DynamoDbException e) {
log.error("Error adding an item to the table: {} ", e.getMessage());
}
}
private void putItem(String pk, int sk, int balance) {
try {
HashMap<String, AttributeValue> attrMap = new HashMap<>();
attrMap.put("PK", AttributeValue.builder().s(pk).build());
attrMap.put("SK", AttributeValue.builder().s(Integer.toString(sk)).build());
attrMap.put("Balance", AttributeValue.builder().n(Integer.toString(balance)).build());
PutItemRequest request = PutItemRequest.builder()
.tableName(TABLE_NAME)
.item(attrMap)
.build();
dynamoDbClient.putItem(request);
} catch (DynamoDbException e) {
log.error("Error adding an item to the table: {} ", e.getMessage());
}
}
private void queryItem(String pk) {
try {
// PK 만 써도 돼요.
HashMap<String, Condition> attrMap = new HashMap<>();
attrMap.put("PK", Condition.builder()
.attributeValueList(AttributeValue.builder().s(pk).build())
.comparisonOperator(ComparisonOperator.EQ)
.build());
QueryRequest request = QueryRequest.builder()
.tableName(TABLE_NAME)
.keyConditions(attrMap)
.build();
QueryResponse response = dynamoDbClient.query(request);
response.items().forEach((value) -> log.info("value: {}", value));
} catch (DynamoDbException e) {
log.error("Error getting an item from the table: {} ", e.getMessage());
}
}
private void updateItem(String pk, int sk, int balance) {
try {
HashMap<String, AttributeValue> attrMap = new HashMap<>();
attrMap.put("PK", AttributeValue.builder().s(pk).build());
attrMap.put("SK", AttributeValue.builder().s(Integer.toString(sk)).build());
String balanceStr = String.valueOf(balance);
// Create an UpdateItemRequest
UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
.tableName(TABLE_NAME)
.key(attrMap)
.attributeUpdates(
new HashMap<String, AttributeValueUpdate>() {{
put("balance", AttributeValueUpdate.builder()
.value(AttributeValue.builder().n(balanceStr).build())
.action(AttributeAction.PUT)
.build());
}}
).build();
UpdateItemResponse response = dynamoDbClient.updateItem(updateItemRequest);
// 결과 출력.
Map<String, AttributeValue> attributes = response.attributes();
if (attributes != null) {
for (Map.Entry<String, AttributeValue> entry : attributes.entrySet()) {
String attributeName = entry.getKey();
AttributeValue attributeValue = entry.getValue();
System.out.println(attributeName + ": " + attributeValue);
}
} else {
System.out.println("Item was updated, but no attributes were returned.");
}
} catch (DynamoDbException e) {
System.err.println("Error getting an item from the table: " + e.getMessage());
}
}
}
command(데이터변동)과 query(데이터 조회)를 분리함으로써 비즈니스 로직이 포함된 API를 구현하는 패턴이다.
클라이언트와 서버 간 통신의 효율성을 높이기 위한 설계로 여러 API 호출을 하나의 API로 집계(aggregation)하여 클라이언트가 한 번의 호출로 필요한 데이터를 얻을 수 있도록 하는 설계 방식이며 주로 API Gateway나 Aggregator 서비스를 통해 구현된다.
단점으로
요청한 데이터가 많을수록 해당 서비스에서 많은 API를 호출받게 될 것인데 이때 DB 부하 측면에서 괜찮다고해도 응답 시간이 느려질 수 있다.이를 해결하기 위해 CQRS 패턴 도입
CQRS 기반의 시스템에서 API Aggregation을 통해 클라이언트 응답 성능을 최적화