인터넷을 떠돌아 다니다가 CQRS라는 좋은 패턴이 보이길래 한번 공부해보았습니다.
CQRS를 알아보기 전에 먼저 Event Sourcing란 무엇인지 알아보겠습니다.
이벤트 소싱이란 영속 데이터를 어떤 식으로 저장할 것인지에 관한 것으로,
기존에 저희가 대중적으로 사용하고 있는 데이터 저장 방법은 현실 세계의 것들을 직접적으로 표현하고 있는 도메인 객체를 DB에 담는 방식이었습니다.
이러한 방식은 직관적이어서 이해하기 쉽다는 장점이 있지만, 항상 도메인 객체의 최종 상태만을 담고 있기 때문에 변경된 기록을 정확하게 추적할 수 없고, 한 데이터에 대해 검색 및 변경 요청이 빈번하게 발생하기 때문에 동시성 문제를 갖고 있다는 단점이 존재합니다.
반면 이벤트 소싱 방식은 도메인 객체를 생성하고, 상태를 변경하기 위해 발생하는 이벤트(Event)들을 DB에 저장함으로써 위에 언급한 문제들의 해결을 시도합니다.
이벤트 소싱 방식에서는 도메인 객체에 대한 변경 이벤트를 모두 추적할 수 있으며,
이벤트는 한 번 발생한 이후 수정되지 않기 때문에 UPDATE나 DELETE 없이 항상 INSERT 작업만 일어납니다. 따라서 동시성 문제로부터 비교적 자유롭습니다.
즉 기존에 데이터 결과물만 저장하는 것이 아닌 데이터의 모든 이벤트들을 기록하여 그 이벤트들의 최종 결과를 조회한다고 보시면 될것 같습니다. (저희가 일반적으로 이해하는 데이터랑 느낌이 다르네요!)
그렇다면 이벤트 소싱 방식에서 질의(query)에 대한 응답은 어떻게 이루어질까? 질의란 데이터를 변경하지 않고 현재 데이터의 상태를 조회하는 요청을 뜻합니다.
실제 DB에 담겨 있는 데이터는 이벤트들의 집합이므로, 이 이벤트들을 시간 순서대로 실행하고 나면 최종적인 데이터를 얻을 수 있습니다. 하지만 조회 요청이 들어올 때마다 쌓여있는 이벤트들을 모두 실행시키는 것은 너무나 비효율적이므로, 데이터의 조회에 대한 새로운 모델이 필요해보입니다. 이러한 맥락에서 이벤트 소싱 패턴은 명령(command)와 질의(query)의 책임을 분리하는 패턴인 CQRS 패턴과 잘 어울립니다.
Axon Framework는 DDD 패러다임 하에서 Event Sourcing과 CQRS 패턴을 이용해 애플리케이션을 작성할 수 있도록 도와주는 Framework입니다.
해당 프레임워크와 함께 스프링부트에서 CQRS 및 Event Sourcing 방식의 만들어보려고 합니다.
의존성
implementation("org.axonframework:axon-spring-boot-starter:4.5.15")
src/main/kotlin/
├── command
└── account
├── config
├── controller
├── domain
└── aggregate
└── entity
├── dto
├── event
└── account
├── repository
├── service
└── impl
모든 Command는 BaseCommand를 상속하여 작동을 합니다.
open class BaseCommand<T> (
@TargetAggregateIdentifier
open val id: T
)
@TargetAggregateIdentifier
: commandGateway가 적절한 Aggregate id를 가진 Instance에 해당 Command를 실행할 수 있도록 해줌.Event는 위에서 언급했듯이 도메인 객체에 적용되는 여러 변경사항들을 나타내고, 이벤트 소싱 패턴에서는 이 이벤트들을 DB에 저장합니다. 이 또한 Command와 같이 BaseEvent를 정의하여 이 클래스를 상속하도록 했는데, 이렇게 함으로써 손쉽게 Query를 구현할 수 있습니다.
open class BaseEvent<T> (
open val id: T
)
Aggregate에 대해 다시 이야기를 해보자면 DDD에서 맨 처음 등장한 개념으로, 밀접한 관련이 있는 도메인 객체들을 하나로 묶은 것입니다. Event Sourcing에서 Event란 구체적으로 Domain Event를 뜻하는데, Axon Framework는 Aggregate를 단위로 Domain Event를 DB에 저장할 수 있도록 기능을 제공합니다. 이 Aggregate 객체에 해당 도메인 영역에 대한 핵심 비즈니스 로직이 모여있게 됩니다.
Entity는 Query를 위한 Entity로, 해당 실습에서는 Event Store 뿐만 아니라 Query를 위한 DB도 따로 두어 Command가 발생할 때마다 해당 Aggregate의 Query Entity를 업데이트해주는 방식으로 CQRS의 Query 파트를 구현하였습니다.
Request -> Controller -> Service까지는 완전 동일합니다.
Service 객체는 CommandGateway를 주입받는데, 이 객체는 아래와 같이 Command를 @CommandHandler
이 붙은 함수로 넘겨주는 게이트웨이 역할을 합니다. 그리고 CommandHandler 함수는 Event를 발생시켜 최종적으로는 @EventHandler
이 붙은 함수로 넘겨줍니다.
CommandGateWay -> Command -> Event
@Service
class AccountCommandServiceImpl(
@Autowired private val commandGateway: CommandGateway
): AccountCommandService {
override fun createAccount(accountCreateDto: AccountCreateDto): CompletableFuture<String> {
// send를 하게 되면 CommandHandler 로 넘겨짐
return commandGateway.send(CreateAccountCommand(
UUID.randomUUID().toString(),
accountCreateDto.startingBalance,
accountCreateDto.currency
))
}
override fun creditMoneyToAccount(accountNumber: String, moneyCreditDto: MoneyCreditDto): CompletableFuture<String> {
return commandGateway.send(CreditMoneyCommand(
accountNumber,
moneyCreditDto.creditAmount,
moneyCreditDto.currency
))
}
override fun debitMoneyFromAccount(accountNumber: String, moneyDebitDto: MoneyDebitDto): CompletableFuture<String> {
return commandGateway.send(DebitMoneyCommand(
accountNumber,
moneyDebitDto.debitAmount,
moneyDebitDto.currency
))
}
}
Query Service는 그냥 Event Store 혹은 Repository에서 조회만 하면 끝입니다.
CommandHandler 및 EventSourcingHandler 함수들은 Aggregate 객체 내에 위치하게 됩니다.
@Aggregate
class AccountAggregate {
@AggregateIdentifier
var id: String = ""
...
@CommandHandler
constructor(createAccountCommand: CreateAccountCommand) {
this.id = createAccountCommand.id
AggregateLifecycle.apply(AccountCreatedEvent(createAccountCommand.id, createAccountCommand.accountBalance, createAccountCommand.currency))
}
@EventSourcingHandler
fun on(accountCreatedEvent: AccountCreatedEvent) {
this.id = accountCreatedEvent.id
this.accountBalance = accountCreatedEvent.accountBalance
this.currency = accountCreatedEvent.currency
this.status = "CREATED"
AggregateLifecycle.apply(AccountActivatedEvent(this.id, "ACTIVATED"))
}
@EventSourcingHandler
fun on(accountActivatedEvent: AccountActivatedEvent) {
this.status = accountActivatedEvent.status
}
@CommandHandler
fun on(creditMoneyCommand: CreditMoneyCommand) {
AggregateLifecycle.apply(MoneyCreditedEvent(creditMoneyCommand.id, creditMoneyCommand.creditAmount, creditMoneyCommand.currency))
}
....
}
Event Sourcing은 도메인 객체가 아닌 도메인 객체에 가해지는 이벤트들을 저장하는 방식이고, 도메인 객체의 최종적인 상태를 조회하기 위해서는 추가적인 전략이 필요합니다. 이번 실습에서는 Aggregate에 대응하는 Query Entity를 만들어, 해당 Aggregate 내에서 Event가 발생할 때마다 이 Query Entity를 업데이트해주는 방식으로 데이터의 최종 상태를 유지할 수 있도록 하였습니다.
그렇게 하기 위한 방법으로, 해당 Aggregate가 상속하고 있는 공통된 Base Event에 대한 EventSourcingHandler를 만들어 이 함수에서 Query Entity를 업데이트하도록 했습니다.
@Component
class AccountQueryEntityManager (
@Autowired
private val accountRepository: AccountRepository,
@Autowired
@Qualifier("accountAggregateEventSourcingRepository")
private val accountAggregateEventSourcingRepository: EventSourcingRepository<AccountAggregate>
) {
// 이벤트가 발생할때 마다 Query Entity를 최신화 시켜줌
@EventSourcingHandler
fun on(event: BaseEvent<String>) {
persistAccount(buildQueryAccount(getAccountFromEvent(event)))
}
private fun getAccountFromEvent(event: BaseEvent<String>): AccountAggregate {
return accountAggregateEventSourcingRepository.load(event.id).wrappedAggregate.aggregateRoot
}
private fun findExistingOrCreateQueryAccount(id: String): AccountQueryEntity {
return if(accountRepository.findById(id).isPresent) accountRepository.findById(id).get()
else AccountQueryEntity()
}
private fun buildQueryAccount(accountAggregate: AccountAggregate): AccountQueryEntity {
val accountQueryEntity = findExistingOrCreateQueryAccount(accountAggregate.id)
accountQueryEntity.id = accountAggregate.id
accountQueryEntity.accountBalance = accountAggregate.accountBalance
accountQueryEntity.currency = accountAggregate.currency
accountQueryEntity.status = accountAggregate.status
return accountQueryEntity
}
private fun persistAccount(accountQueryEntity: AccountQueryEntity) {
accountRepository.save(accountQueryEntity)
}
}
이러한 Query 업데이트에 대해 단점이 존재하는데 Query를 위한 데이터를 따로 저장해두기 때문에 앞서 말한 기존 방식의 동시성 문제에 똑같이 노출되어있다는 점입니다. 또한 Event를 저장함과 동시에 Query를 업데이트하는 작업도 하기 때문에 Disk I/O 작업이 많아져 성능 저하의 우려가 있습니다.(캐싱 처리가 필요할 듯!)
등록
Query Entity 조회
이벤트 조회
CQRS 패턴이 처음에는 데이터 저장 방식 때문에 되게 생소했는데
한번 공부를 해보고 직접 해보고 나서는 이렇게 작동하는구나를 이해하였습니다.
또 Query 즉 조회가 분리되어있다 보니까 캐싱처리를 적용하기에 정말 편할것 같다는 생각이 듭니다.
이러한 이벤트들이 한눈에 쉽게 볼수가 있어서 이러한 특성이 필요한 도메인 데이터가 있으면 CQRS 패턴과 이벤트 소싱 패턴을 적용하면 좋을 것 같습니다.
모든 소스코드를 블로그 내용에 담기에는 방대하기 때문에 밑에 참조 링크를 남기겠습니다.
(근데 그렇게 엄청나게 많지도 않아요)