이번 포스팅에서는 Command Query Responsibility Segregation (CQRS) 와 Event Sourcing 디자인 패턴에 대해 간단히 알아봅니다.
우선 이 두 가지 개념을 개별적으로 이해하고 구현합니다. 이 후 두 개념을 통합하고 구현합니다.
Event Sourcing은 애플리케이션 상태를 순서가 지정된 이벤트 시퀀스로 저장하는 새로운 방식을 제공합니다. 우리는 이러한 이벤트를 선택적으로 쿼리하고 언제든지 애플리케이션 상태를 재구성할 수 있습니다.
예를 들어 상태 변경과 함께 로그를 쌓고 싶다고 할 때, 장바구니에 상품을 담았다가 삭제할 수 있습니다. 그런데 유저가 또 다시 상품을 담았다가 삭제합니다. 최종적인 관점에서 본다면 장바구니에 상품이 없어진 것으로만 볼 수 있습니다. 하지만 이 때 비즈니스적으로 맞춤광고를 내보이고 싶다면 어떨까요? 이러한 상태변화들을 기록하고 있지 않아 불가능합니다. 이럴 때 Event Sourcing을 통해 상태변화 기록들을 이용한다면 충분히 가능한 이야기가 될 수 있습니다.
몇 가지 사실을 이야기 하고 가겠습니다. (알아만 두시면 편합니다.)
간단히 말해서 CQRS는 command side와 query side를 분리하는 애플리케이션 아키텍쳐입니다.
CQRS는 CQS(Command Query Seperation)원칙을 기반으로 하며 Bertrand Meyer에 의해 제안되었습니다. Query는 결과를 반환하고 시스템의 상태를 바꾸지 않습니다. 반면 Command는 시스템의 상태를 바꾸지만 값을 반환하지는 않습니다. CQRS는 메서드가 아니라 모델을 의미하므로 메서드의 리턴까지는 제한 하지 않는 다는 의견이 있습니다. 참고
왼쪽 그림과 같이 하나의 read와 write 저장소를 사용하여도 되지만 오른쪽 그림과 같이 물리적으로 두 개의 저장소로 나눈 뒤 동기화를 유지하는 매커니즘을 도입할 수 있습니다.
이론적으로 CQRS와 Event Sourcing은 종속되지 않지만 현실에서는 이 두개는 필수적으로 종속됩니다.
유저의 프로필 관련한 CRUD 작업이 있는 간단한 애플리케이션을 만들어 봅니다. 이 애플리케이션을 통해 CQRS와 Event Sourcing을 설명하겠습니다.
위 그림에 맞는 애플리케이션을 만들기 위해 유저관련 도메인들을 생성합니다. 간단한 예제이다 보니 고칠점이 눈에 많이 보입니다만, 무시하도록 합시다.
class User(
val userId: String,
val firstName: String,
val lastName: String
) {
var contacts: Set<Contact> = setOf()
var addresses: Set<Address> = setOf()
}
data class Contact(
val type: String,
val detail: String
)
data class Address(
val city: String,
val state: String,
val postcode: String
)
그리고 레퍼지토리를 생성합니다.
class UserRepository(
private val store:MutableMap<String, User> = mutableMapOf()
) {
fun addUser(userId: String, user: User) {
store[userId] = user
}
fun getUser(userId: String): User {
return store[userId]!!
}
}
서비스 클래스도 만들어 줍니다.
class UserService (
private val repository: UserRepository
) {
fun createUser(userId: String, firstName: String, lastName: String) {
val user = User(userId, firstName, lastName)
repository.addUser(userId, user)
}
fun updateUser(userId: String, contacts: Set<Contact>, addresses: Set<Address>) {
val user = repository.getUser(userId)
user.contacts = contacts
user.addresses = addresses
repository.addUser(userId, user)
}
fun getContactByType(userId: String, contactType: String): Set<Contact> {
val user = repository.getUser(userId)
return user.contacts
.filter { it.type == contactType }
.toSet()
}
fun getAddressByRegion(userId: String, state: String): Set<Address> {
val user = repository.getUser(userId)
return user.addresses
.filter { it.state == state }
.toSet()
}
}
악취가 조금 나도 우리가 학습하는데에는 문제가 없어보입니다.
많은 문제들이 보이지만 그 중에서 저희는 두 가지에만 집중을 할 것입니다.
읽기연산과 쓰기연산을 다루기 위해 도메인 모델과 영속성 모델을 분리할 것입니다.
새로운 컴포넌트들이 도입된 것을 볼 수 있습니다.
Aggregate/Aggregator
Aggregate는 엔티티를 Aggregate root에 바인딩하여 다양한 엔티티를 논리적으로 그룹화하는 DDD에 설명된 패턴입니다.
Aggregate는 엔티티간의 트랜잭션 일관성을 제공합니다.
CQRS는 쓰기 도메인 모델을 그룹화하여 Aggregate 패턴의 이점을 자연스럽게 활용합니다. Aggregate는 일반적으로 더 나은 상태를 위해 캐시된 상태를 유지할 수 있지만 필수적인 것은 아닙니다.
Projection/Projector
Projection이란 본질적으로 다양한 모양과 구조로 도메인 모델을 나타내는 것을 의미합니다. 원본데이터의 Projection들은 오직 읽기 전용이며 향상된 read 경험을 제공하며 최적화 되어있습니다.
우리는 필요한 Command에 대한 정의를 하면서 write side를 구현할 수 있습니다.
command는 도메인 모델의 상태를 변경하려는 의도이며 이것의 성공여부는 우리가 구성한 비즈니스 규칙에 달려있습니다. Command는 아래와 같습니다.
data class CreateUserCommand(
val userId: String,
val firstName: String,
val lastName: String
)
data class UpdateUserCommand(
val userId: String,
val addresses: Set<Address>,
val contact: Set<Contact>
)
다음으로 명령을 받고 처리하는 Aggregate를 정의합니다. Aggregate는 Command를 수락하거나 거부할 것입니다.
/*command용 서비스라고 이해*/
class UserAggregate(
val writeRepository: UserWriteRepository
) {
fun handleCreateUserCommand(command: CreateUserCommand): User {
val user = User(command.userId, command.firstName, command.lastName)
writeRepository.addUser(user.userId, user)
return user
}
fun handleUpdateUserCommand(command: UpdateUserCommand): User {
val user:User = writeRepository.getUser(command.userId)
user.addresses = command.addresses
user.contacts = command.contact
writeRepository.addUser(user.userId, user)
return user
}
}
이어서 레퍼지토리도 구현합니다.
class UserWriteRepository(
private val store:MutableMap<String, User> = mutableMapOf()
) {
fun addUser(userId: String, user: User) {
store[userId] = user
}
fun getUser(userId: String): User {
return store[userId]!!
}
}
/*쿼리된 결과가 담길 DTO라고 이해*/
class UserAddress(
val addressByRegion: Map<String, Set<Address>> = mapOf()
)
class UserContact(
val contactByType: Map<String, Set<Contact>> = mapOf()
)
/*쿼리용 파라미터라고 이해*/
data class ContactByTypeQuery(
val userId: String,
val contactType: String
)
data class AddressByRegionQuery(
val userId: String,
val state: String
)
/*조회 서비스라고 이해*/
class UserProjection(
val readRepository: UserReadRepository
) {
fun handle(query: ContactByTypeQuery): Set<Contact> {
val userContact = readRepository.getUserContact(query.userId)
return userContact.contactByType[query.contactType]!!
}
fun handle(query: AddressByRegionQuery): Set<Address> {
val userAddress: UserAddress = readRepository.getUserAddress(query.userId)
return userAddress.addressByRegion[query.state]!!
}
}
/*쿼리용 query dsl 서비스라고 이해*/
class UserReadRepository(
private val store: MutableMap<String, User> = mutableMapOf()
) {
fun getUserContact(userId: String): UserContact {
val user = store[userId]!!
return UserContact(mapOf(userId to user.contacts))
}
fun getUserAddress(userId: String): UserAddress {
val user = store[userId]!!
return UserAddress(mapOf(userId to user.addresses))
}
}
현재 저희는 쓰기와 읽기 저장소를 동기화하는 부분이 없습니다. 이 부분에서 저희는 Projector를 알아야 합니다. Projector는 write domain 모델을 read domain 모델로 projection(투영)하는 로직을 갖고 있습니다. 소스코드는 아래와 같습니다.
class User(
val userId: String,
val firstName: String,
val lastName: String
) {
/*유저필드 변경*/
var contacts: MutableSet<Contact> = mutableSetOf()
var addresses: MutableSet<Address> = mutableSetOf()
}
Projector를 만들어줍니다.
class UserProjector(
val readRepository: UserReadRepository
) {
fun project(user: User) {
val userContact = readRepository.getUserContact(user.userId)
val contactByType: MutableMap<String, MutableSet<Contact>> = mutableMapOf()
for (contact in user.contacts) {
val contacts = contactByType[contact.type] ?: mutableSetOf()
contacts.add(contact)
contactByType[contact.type] = contacts
}
userContact.contactByType = contactByType
readRepository.addUserContact(user.userId, userContact)
val userAddress = readRepository.getUserAddress(user.userId)
val addressByRegion: MutableMap<String, MutableSet<Address>> = mutableMapOf()
for (address in user.addresses) {
val addresses = addressByRegion[address.state] ?: mutableSetOf()
addresses.add(address)
addressByRegion[address.state] = addresses
}
userAddress.addressByRegion = addressByRegion
readRepository.addUserAddress(user.userId, userAddress)
}
}
굉작히 조악해보이지만 충분한 인사이트를 얻을 수 있습니다. 위의 코드는 사실 필수가 아닙니다. 읽기와 쓰기 저장소를 물리적으로 다른 곳에 위치시키지 않고 한 곳의 저장소에서도 이용가능합니다. 이렇게 되면 동기화시키는 작업이 따로 필요가 없게 되겠죠?
abstract class Event(
val id: UUID = UUID.randomUUID(),
val created: Date = Date()
)
보시다시피 저희가 만드는 모든 이벤트는 유니크한 id와 타임스탬프를 갖습니다. 추가적인 데이터가 필요하시다면 필드를 추가하셔도 무방합니다.
class UserCreatedEvent(
val userId: String,
val firstName: String,
val lastName: String
) : Event()
class UserContactAddedEvent(
val contactType: String,
val contactDetails: String
) : Event()
class UserContactRemovedEvent(
val contactType: String,
val contactDetails: String
) : Event()
class UserAddressAddedEvent(
val city: String,
val state: String,
val postCode: String
) : Event()
class UserAddressRemovedEvent(
val city: String,
val state: String,
val postCode: String
) : Event()
Event를 상속받는 세분화된 이벤트들을 만들었습니다. 이 때 과거시제를 사용하여 클래스 네이밍을 해야합니다.
class EventStore(
val store: MutableMap<String, List<Event>> = mutableMapOf()
)
이벤트들을 순차적으로 저장할 수 있는 저장소를 만들었습니다.
이벤트 저장소 코드를 아래와 같이 바꿔줍니다.
class EventStore(
private val store: MutableMap<String, MutableList<Event>> = mutableMapOf()
) {
fun addEvent(userId: String, event: Event) {
var events: MutableList<Event>? = store[userId]
if (events == null) {
events = mutableListOf()
events.add(event)
store[userId] = events
} else {
events.add(event)
}
}
operator fun get(userId: String): MutableList<Event> {
return store[userId]?: mutableListOf()
}
}
이후에 실제로 저장소를 사용하는 곳의 코드를 작성하면 아래와 같습니다.
class UserService(
private val repository: EventStore
) {
fun createUser(userId: String, firstName: String, lastName: String) {
repository.addEvent(userId, UserCreatedEvent(userId, firstName, lastName))
}
fun updateUser(userId: String, contacts: Set<Contact>, addresses: Set<Address>) {
val user:User = UserUtility.recreateUserState(repository, userId)
user.contacts
.filter { c -> !contacts.contains(c) }
.forEach { repository.addEvent(userId, UserContactRemovedEvent(it.type, it.detail)) }
contacts
.filter { !user.contacts.contains(it) }
.forEach { repository.addEvent(userId, UserContactAddedEvent(it.type, it.detail)) }
user.addresses
.filter { !addresses.contains(it)}
.forEach { repository.addEvent(userId, UserAddressRemovedEvent(it.city, it.state, it.postcode)) }
addresses
.filter { !user.addresses.contains(it) }
.forEach { repository.addEvent(userId, UserAddressAddedEvent(it.city, it.state, it.postcode)) }
}
fun getContactByType(userId: String, contactType: String): Set<Contact> {
val user = UserUtility.recreateUserState(repository, userId)
return user.contacts
.filter { it.type == contactType }
.toSet()
}
fun getAddressByRegion(userId: String, state: String): Set<Address> {
val user = UserUtility.recreateUserState(repository, userId)
return user.addresses
.filter { it.state == state }
.toSet()
}
}
object UserUtility {
fun recreateUserState(store: EventStore, userId: String): User {
var user: User? = null
val events: List<Event> = store[userId]
for (event in events) {
if (event is UserCreatedEvent) {
val e = event
user = User(e.userId, e.firstName, e.lastName)
}
if (event is UserAddressAddedEvent) {
val e = event
val address = Address(e.city, e.state, e.postCode)
user?.addresses?.add(address)
}
if (event is UserAddressRemovedEvent) {
val e = event
val address = Address(e.city, e.state, e.postCode)
user?.addresses?.remove(address)
}
if (event is UserContactAddedEvent) {
val e = event
val contact = Contact(e.contactType, e.contactDetails)
user?.contacts?.add(contact)
}
if (event is UserContactRemovedEvent) {
val e = event
val contact = Contact(e.contactType, e.contactDetails)
user?.contacts?.remove(contact)
}
}
return user!!
}
}
유저서비스에서는 상태를 바꾸지 않고 이벤트만을 저장하고 있습니다. 신기합니다. 이후에 유저를 가져올 때에는 가장 최신화된 상태의 유저를 가져오도록 UserUtility에 구현이 되어있습니다.
CQRS와 Event Sourcing에 대해 간략하게 알아보았으니 이제 두 패턴을 통합해보겠습니다. 아키텍처는 아래 그림과 같습니다.
달라진게 보이시나요? CQRS패턴에서 쓰기 저장소만 Event Store로 교체하였습니다.
Event Sourcing과 CQRS를 독립적으로 구현한 경우 이해하기 어렵지 않습니다. CQRS를 적용한 애플리케이션에 Event Sourcing 애플리케이션에서 정의한 이벤트 스토어를 추가해보겠습니다.
우선 우리는 상태를 업데이트 하는 대신 이벤트를 만들도록 Aggregate를 바꾸어야 합니다.
class UserAggregate(
private val writeRepository: EventStore
) {
fun handleCreateUserCommand(command: CreateUserCommand): List<UserCreatedEvent> {
val event = UserCreatedEvent(command.userId, command.firstName, command.lastName)
writeRepository.addEvent(command.userId , event)
return listOf(event)
}
fun handleUpdateUserCommand(command: UpdateUserCommand): List<Event> {
val user = UserUtility.recreateUserState(writeRepository, command.userId)
val events = mutableListOf<Event>()
val contactsToRemove = user.contacts
.filter { !command.contact.contains(it) }
for (contact in contactsToRemove) {
val userContactRemovedEvent = UserContactRemovedEvent(contact.type, contact.detail)
events.add(userContactRemovedEvent)
writeRepository.addEvent(command.userId, userContactRemovedEvent)
}
val contactsToAdd = command.contact
.filter { !user.contacts.contains(it) }
for (contact in contactsToAdd) {
val userContactAddedEvent = UserContactAddedEvent(contact.type, contact.detail)
events.add(userContactAddedEvent)
writeRepository.addEvent(command.userId, userContactAddedEvent)
}
// addressToRemove , addressToAdd도 생성
return events
}
}
또한 상태를 바꾸던 프로젝터도 변경을 해주어야 합니다.
class UserProjector(val readRepository: UserReadRepository) {
fun project(userId: String, events: List<Event?>) {
for (event in events) {
if (event is UserAddressAddedEvent) apply(userId, event)
if (event is UserAddressRemovedEvent) apply(userId, event)
if (event is UserContactAddedEvent) apply(userId, event)
if (event is UserContactRemovedEvent) apply(userId, event)
}
}
fun apply(userId: String, event: UserAddressAddedEvent) {
val address = Address(
event.city, event.state, event.postCode
)
val userAddress =
readRepository.getUserAddress(userId)
val addresses = userAddress.addressByRegion[address.state]!!
addresses.add(address)
userAddress.addressByRegion[address.state] = addresses
}
fun apply(userId: String?, event: UserAddressRemovedEvent) {
val address = Address(
event.city, event.state, event.postCode
)
val userAddress = readRepository.getUserAddress(userId!!)
val addresses = userAddress.addressByRegion[address.state]
addresses?.remove(address)
readRepository.addUserAddress(userId, userAddress)
}
fun apply(userId: String?, event: UserContactAddedEvent?) {
// Similarly handle UserContactAddedEvent event
}
fun apply(userId: String?, event: UserContactRemovedEvent?) {
// Similarly handle UserContactRemovedEvent event
}
}
CQRS와 Event Sourcing을 통합한 모델에서 우리가 해야할 것은 발생하는 모든 도메인 이벤트를 처리(저장)하고 그것을 모든 읽기 도메인 모델에 적용하는 것입니다. 또한 고려해야할 점이 많습니다.
이번 포스팅에서는 CQRS와 Event Soucring에 대해 간략하게 알아보고 구현하며 각각의 장단점을 파악해보았습니다.
https://www.baeldung.com/cqrs-event-sourcing-java
https://www.youtube.com/watch?v=TDhknOIYvw4&ab_channel=SpringCampbyKSUG
https://stackoverflow.com/questions/43433318/cqrs-command-return-values