CQRS & Event Sourcing

dudududu·2023년 11월 4일
1
post-thumbnail

1. 소개

이번 포스팅에서는 Command Query Responsibility Segregation (CQRS) 와 Event Sourcing 디자인 패턴에 대해 간단히 알아봅니다.

2. 기본 컨셉

우선 이 두 가지 개념을 개별적으로 이해하고 구현합니다. 이 후 두 개념을 통합하고 구현합니다.

2.1 Event Sourcing

Event Sourcing

Event Sourcing은 애플리케이션 상태를 순서가 지정된 이벤트 시퀀스로 저장하는 새로운 방식을 제공합니다. 우리는 이러한 이벤트를 선택적으로 쿼리하고 언제든지 애플리케이션 상태를 재구성할 수 있습니다.
예를 들어 상태 변경과 함께 로그를 쌓고 싶다고 할 때, 장바구니에 상품을 담았다가 삭제할 수 있습니다. 그런데 유저가 또 다시 상품을 담았다가 삭제합니다. 최종적인 관점에서 본다면 장바구니에 상품이 없어진 것으로만 볼 수 있습니다. 하지만 이 때 비즈니스적으로 맞춤광고를 내보이고 싶다면 어떨까요? 이러한 상태변화들을 기록하고 있지 않아 불가능합니다. 이럴 때 Event Sourcing을 통해 상태변화 기록들을 이용한다면 충분히 가능한 이야기가 될 수 있습니다.

몇 가지 사실을 이야기 하고 가겠습니다. (알아만 두시면 편합니다.)

  1. 상태를 저장한 후에 로그를 기록하는 것이 아니고 어떠한 변화를 나타내는 이벤트를 저장소에 기록하고 이 일련의 이벤트를 재생해서 상태를 만들어 내는 것이다.
  2. Event Sourcing은 도메인에서 발생하는 모든 사실들을 순차적으로 기록하는 패턴이다.
  3. 도메인이 Command를 받으면 그 결과로 이벤트를 발생시키고 그것을 이벤트 저장소에 저장, 이벤트 핸들러를 통해 상태로 변환하는 것이다.
  4. 영속대상은 이벤트이며 이벤트는 절대 삭제되고 수정되지 않고 오직 추가 되기만 한다.
  5. 도메인 오브젝트 하나당 도메인 이벤트 스트림이 존재한다.
  6. Command와 Event는 같은 메시지이지만 성격이 다르다. Command는 검증의 대상이지만 Event는 그 자체로 이미 벌어진 사실이며 검증 대상이 아니다.
  7. 백 만개의 이벤트를 가지는 도메인 이벤트라면? 전통적인 시스템과는 다르게 성능이슈가 발생할 수 있다.(백 만개의 데이터를 조회해야하기 때문에) 그래서 스냅샷이라는 버저닝을 진행한다. 스냅샷은 계속 추가되는 것이 아니고 갱신 되는 것이다. 이것을 롤링 스냅샷이라고 부른다.

2.2 CQRS

CQRS

간단히 말해서 CQRS는 command side와 query side를 분리하는 애플리케이션 아키텍쳐입니다.
CQRS는 CQS(Command Query Seperation)원칙을 기반으로 하며 Bertrand Meyer에 의해 제안되었습니다. Query는 결과를 반환하고 시스템의 상태를 바꾸지 않습니다. 반면 Command는 시스템의 상태를 바꾸지만 값을 반환하지는 않습니다. CQRS는 메서드가 아니라 모델을 의미하므로 메서드의 리턴까지는 제한 하지 않는 다는 의견이 있습니다. 참고
왼쪽 그림과 같이 하나의 read와 write 저장소를 사용하여도 되지만 오른쪽 그림과 같이 물리적으로 두 개의 저장소로 나눈 뒤 동기화를 유지하는 매커니즘을 도입할 수 있습니다.
이론적으로 CQRS와 Event Sourcing은 종속되지 않지만 현실에서는 이 두개는 필수적으로 종속됩니다.

3. 간단한 애플리케이션

유저의 프로필 관련한 CRUD 작업이 있는 간단한 애플리케이션을 만들어 봅니다. 이 애플리케이션을 통해 CQRS와 Event Sourcing을 설명하겠습니다.
application overview

3.1 애플리케이션 만들기

위 그림에 맞는 애플리케이션을 만들기 위해 유저관련 도메인들을 생성합니다. 간단한 예제이다 보니 고칠점이 눈에 많이 보입니다만, 무시하도록 합시다.

class User(
    val userId: String,
    val firstName: String,
    val lastName: String
) {
    var contacts: Set<Contact> = setOf()
    var addresses: Set<Address> = setOf()
}

data class Contact(
    val type: String,
    val detail: String
)

data class Address(
    val city: String,
    val state: String,
    val postcode: String
)

그리고 레퍼지토리를 생성합니다.

class UserRepository(
    private val store:MutableMap<String, User> = mutableMapOf()
) {
    fun addUser(userId: String, user: User) {
        store[userId] = user
    }

    fun getUser(userId: String): User {
        return store[userId]!!
    }
}

서비스 클래스도 만들어 줍니다.

class UserService (
    private val repository: UserRepository
) {
    fun createUser(userId: String, firstName: String, lastName: String) {
        val user = User(userId, firstName, lastName)
        repository.addUser(userId, user)
    }

    fun updateUser(userId: String, contacts: Set<Contact>, addresses: Set<Address>) {
        val user = repository.getUser(userId)
        user.contacts = contacts
        user.addresses = addresses
        repository.addUser(userId, user)
    }

    fun getContactByType(userId: String, contactType: String): Set<Contact> {
        val user = repository.getUser(userId)
        return user.contacts
            .filter { it.type == contactType }
            .toSet()
    }

    fun getAddressByRegion(userId: String, state: String): Set<Address> {
        val user = repository.getUser(userId)
        return user.addresses
            .filter { it.state == state }
            .toSet()
    }
}

악취가 조금 나도 우리가 학습하는데에는 문제가 없어보입니다.
많은 문제들이 보이지만 그 중에서 저희는 두 가지에만 집중을 할 것입니다.

3.2 이 애플리케이션의 문제

  1. 도메인 모델: 읽기와 쓰기연산이 같은 도메인에 대해 발생합니다. 이런 문제는 간단한 도메인에서는 괜찮지만, 도메인 모델이 복잡해지면 문제가 악화될 수 있습니다. 우리는 아마 읽기와 쓰기 연산의 개별적인 요구 사항에 적합하게 도메인 모델과 스토리지를 최적화 해야할 수도 있습니다.
  2. Persistence: 영속성은 항상 도메인 개체에 대해서 최신의 상태만을 저장합니다. 만약 도메인별로 로깅을 남겨야 하는 상황이라면 문제가 발생할 수 있습니다. 또는 위에서 언급했던 장바구니 예시처럼 비즈니스 요구사항이 생긴다면 까다로워질 수 있습니다.

4. CQRS의 도입

읽기연산과 쓰기연산을 다루기 위해 도메인 모델과 영속성 모델을 분리할 것입니다.
cqrs 도입

새로운 컴포넌트들이 도입된 것을 볼 수 있습니다.
Aggregate/Aggregator
Aggregate는 엔티티를 Aggregate root에 바인딩하여 다양한 엔티티를 논리적으로 그룹화하는 DDD에 설명된 패턴입니다.
Aggregate는 엔티티간의 트랜잭션 일관성을 제공합니다.
CQRS는 쓰기 도메인 모델을 그룹화하여 Aggregate 패턴의 이점을 자연스럽게 활용합니다. Aggregate는 일반적으로 더 나은 상태를 위해 캐시된 상태를 유지할 수 있지만 필수적인 것은 아닙니다.
Projection/Projector
Projection이란 본질적으로 다양한 모양과 구조로 도메인 모델을 나타내는 것을 의미합니다. 원본데이터의 Projection들은 오직 읽기 전용이며 향상된 read 경험을 제공하며 최적화 되어있습니다.

4.1 Write side 구현

우리는 필요한 Command에 대한 정의를 하면서 write side를 구현할 수 있습니다.
command는 도메인 모델의 상태를 변경하려는 의도이며 이것의 성공여부는 우리가 구성한 비즈니스 규칙에 달려있습니다. Command는 아래와 같습니다.

data class CreateUserCommand(
    val userId: String,
    val firstName: String,
    val lastName: String
)

data class UpdateUserCommand(
    val userId: String,
    val addresses: Set<Address>,
    val contact: Set<Contact>
)

다음으로 명령을 받고 처리하는 Aggregate를 정의합니다. Aggregate는 Command를 수락하거나 거부할 것입니다.

/*command용 서비스라고 이해*/
class UserAggregate(
    val writeRepository: UserWriteRepository
) {
    fun handleCreateUserCommand(command: CreateUserCommand): User {
        val user = User(command.userId, command.firstName, command.lastName)
        writeRepository.addUser(user.userId, user)
        return user
    }

    fun handleUpdateUserCommand(command: UpdateUserCommand): User {
        val user:User = writeRepository.getUser(command.userId)
        user.addresses = command.addresses
        user.contacts = command.contact
        writeRepository.addUser(user.userId, user)
        return user
    }
}

이어서 레퍼지토리도 구현합니다.

class UserWriteRepository(
    private val store:MutableMap<String, User> = mutableMapOf()
) {
    fun addUser(userId: String, user: User) {
        store[userId] = user
    }

    fun getUser(userId: String): User {
        return store[userId]!!
    }
}

4.2 Read side 구현

/*쿼리된 결과가 담길 DTO라고 이해*/
class UserAddress(
    val addressByRegion: Map<String, Set<Address>> = mapOf()
)

class UserContact(
    val contactByType: Map<String, Set<Contact>> = mapOf()
)

/*쿼리용 파라미터라고 이해*/
data class ContactByTypeQuery(
    val userId: String,
    val contactType: String
)

data class AddressByRegionQuery(
    val userId: String,
    val state: String
)

/*조회 서비스라고 이해*/
class UserProjection(
    val readRepository: UserReadRepository
) {
    fun handle(query: ContactByTypeQuery): Set<Contact> {
        val userContact = readRepository.getUserContact(query.userId)
        return userContact.contactByType[query.contactType]!!
    }

    fun handle(query: AddressByRegionQuery): Set<Address> {
        val userAddress: UserAddress = readRepository.getUserAddress(query.userId)
        return userAddress.addressByRegion[query.state]!!
    }
}

/*쿼리용 query dsl 서비스라고 이해*/
class UserReadRepository(
    private val store: MutableMap<String, User> = mutableMapOf()
) {
    fun getUserContact(userId: String): UserContact {
        val user = store[userId]!!
        return UserContact(mapOf(userId to user.contacts))
    }

    fun getUserAddress(userId: String): UserAddress {
        val user = store[userId]!!
        return UserAddress(mapOf(userId to user.addresses))
    }
}

4.3 Read and Write data의 동기화

현재 저희는 쓰기와 읽기 저장소를 동기화하는 부분이 없습니다. 이 부분에서 저희는 Projector를 알아야 합니다. Projector는 write domain 모델을 read domain 모델로 projection(투영)하는 로직을 갖고 있습니다. 소스코드는 아래와 같습니다.

class User(
    val userId: String,
    val firstName: String,
    val lastName: String
) {
    /*유저필드 변경*/
    var contacts: MutableSet<Contact> = mutableSetOf()
    var addresses: MutableSet<Address> = mutableSetOf()
}

Projector를 만들어줍니다.

class UserProjector(
    val readRepository: UserReadRepository
) {
    fun project(user: User) {
        val userContact = readRepository.getUserContact(user.userId)
        val contactByType: MutableMap<String, MutableSet<Contact>> = mutableMapOf()
        for (contact in user.contacts) {
            val contacts = contactByType[contact.type] ?: mutableSetOf()
            contacts.add(contact)
            contactByType[contact.type] = contacts
        }
        userContact.contactByType = contactByType
        readRepository.addUserContact(user.userId, userContact)

        val userAddress = readRepository.getUserAddress(user.userId)
        val addressByRegion: MutableMap<String, MutableSet<Address>> = mutableMapOf()
        for (address in user.addresses) {
            val addresses = addressByRegion[address.state] ?: mutableSetOf()
            addresses.add(address)
            addressByRegion[address.state] = addresses
        }
        userAddress.addressByRegion = addressByRegion
        readRepository.addUserAddress(user.userId, userAddress)
    }
}

굉작히 조악해보이지만 충분한 인사이트를 얻을 수 있습니다. 위의 코드는 사실 필수가 아닙니다. 읽기와 쓰기 저장소를 물리적으로 다른 곳에 위치시키지 않고 한 곳의 저장소에서도 이용가능합니다. 이렇게 되면 동기화시키는 작업이 따로 필요가 없게 되겠죠?

4.4 CQRS 패턴의 장단점

장점

  1. 읽기와 쓰기 연산에 적합한 분리된 도메인 모델을 선택하는 편리한 방법을 제공합니다.
  2. 높은 쓰기 처리량, 낮은 읽기 레이턴시 등 읽기와 쓰기 연산의 복잡성을 처리하기 위해 개별적으로 적합한 저장소를 선택할 수 있도록 도움을 줍니다.
  3. 관심사를 분리하여 분산 아키텍처에서 이벤트기반 프로그래밍 모델을 자연스럽게 보완합니다.

단점

  1. 단순한 도메인보단 복잡한 도메인이 이 패턴을 적용하여 이득을 얻을 수 있습니다.
  2. 어느정도 코드 중복을 발생시킵니다만, 우리가 얻을 수 있는 이득에 비하면 받아드릴수 있는 정도입니다.
  3. 분리된 저장소는 일관성 문제를 필연적으로 일으킵니다. 두 개의 저장소를 완벽하게 동기화 하는 것은 어렵습니다.

5. Event Sourcing의 도입

이벤트 소싱 적용

abstract class Event(
    val id: UUID = UUID.randomUUID(),
    val created: Date = Date()
)

보시다시피 저희가 만드는 모든 이벤트는 유니크한 id와 타임스탬프를 갖습니다. 추가적인 데이터가 필요하시다면 필드를 추가하셔도 무방합니다.


class UserCreatedEvent(
    val userId: String,
    val firstName: String,
    val lastName: String
) : Event()

class UserContactAddedEvent(
    val contactType: String,
    val contactDetails: String
) : Event()

class UserContactRemovedEvent(
    val contactType: String,
    val contactDetails: String
) : Event()

class UserAddressAddedEvent(
    val city: String,
    val state: String,
    val postCode: String
) : Event()

class UserAddressRemovedEvent(
    val city: String,
    val state: String,
    val postCode: String
) : Event()

Event를 상속받는 세분화된 이벤트들을 만들었습니다. 이 때 과거시제를 사용하여 클래스 네이밍을 해야합니다.

class EventStore(
    val store: MutableMap<String, List<Event>> = mutableMapOf()
)

이벤트들을 순차적으로 저장할 수 있는 저장소를 만들었습니다.

5.2 이벤트를 생성하고 소비하기

이벤트 저장소 코드를 아래와 같이 바꿔줍니다.

class EventStore(
    private val store: MutableMap<String, MutableList<Event>> = mutableMapOf()
) {
    fun addEvent(userId: String, event: Event) {
        var events: MutableList<Event>? = store[userId]
        if (events == null) {
            events = mutableListOf()
            events.add(event)
            store[userId] = events
        } else {
            events.add(event)
        }
    }

    operator fun get(userId: String): MutableList<Event> {
        return store[userId]?: mutableListOf()
    }
}

이후에 실제로 저장소를 사용하는 곳의 코드를 작성하면 아래와 같습니다.

class UserService(
    private val repository: EventStore
) {
    fun createUser(userId: String, firstName: String, lastName: String) {
        repository.addEvent(userId, UserCreatedEvent(userId, firstName, lastName))
    }

    fun updateUser(userId: String, contacts: Set<Contact>, addresses: Set<Address>) {
        val user:User = UserUtility.recreateUserState(repository, userId)
        user.contacts
            .filter { c -> !contacts.contains(c) }
            .forEach { repository.addEvent(userId, UserContactRemovedEvent(it.type, it.detail)) }
        contacts
            .filter { !user.contacts.contains(it) }
            .forEach { repository.addEvent(userId, UserContactAddedEvent(it.type, it.detail)) }
        user.addresses
            .filter { !addresses.contains(it)}
            .forEach { repository.addEvent(userId, UserAddressRemovedEvent(it.city, it.state, it.postcode)) }
        addresses
            .filter { !user.addresses.contains(it) }
            .forEach { repository.addEvent(userId, UserAddressAddedEvent(it.city, it.state, it.postcode)) }
    }

    fun getContactByType(userId: String, contactType: String): Set<Contact> {
        val user = UserUtility.recreateUserState(repository, userId)
        return user.contacts
            .filter { it.type == contactType }
            .toSet()
    }

    fun getAddressByRegion(userId: String, state: String): Set<Address> {
        val user = UserUtility.recreateUserState(repository, userId)
        return user.addresses
            .filter { it.state == state }
            .toSet()
    }
}

object UserUtility {
    fun recreateUserState(store: EventStore, userId: String): User {
        var user: User? = null

        val events: List<Event> = store[userId]
        for (event in events) {
            if (event is UserCreatedEvent) {
                val e = event
                user = User(e.userId, e.firstName, e.lastName)
            }
            if (event is UserAddressAddedEvent) {
                val e = event
                val address = Address(e.city, e.state, e.postCode)
                user?.addresses?.add(address)
            }
            if (event is UserAddressRemovedEvent) {
                val e = event
                val address = Address(e.city, e.state, e.postCode)
                user?.addresses?.remove(address)
            }
            if (event is UserContactAddedEvent) {
                val e = event
                val contact = Contact(e.contactType, e.contactDetails)
                user?.contacts?.add(contact)
            }
            if (event is UserContactRemovedEvent) {
                val e = event
                val contact = Contact(e.contactType, e.contactDetails)
                user?.contacts?.remove(contact)
            }
        }
        return user!!
    }
}

유저서비스에서는 상태를 바꾸지 않고 이벤트만을 저장하고 있습니다. 신기합니다. 이후에 유저를 가져올 때에는 가장 최신화된 상태의 유저를 가져오도록 UserUtility에 구현이 되어있습니다.

5.3 Event Sourcing의 장단점

장점

  1. 읽기, 업데이트 및 쓰기가 필요없기 때문에 쓰기 작업속도가 훨씬 빠릅니다. 쓰기는 이벤트를 저장하는 단계밖에 없습니다.
  2. 객체 관계 임피던스를 제거함에 따라 복잡한 매핑도구가 필요하지 않습니다. 객체지향적인 모델을 RDBMS에다가 저장하는 것은 상당히 직관적이지 않습니다. 하지만 이벤트를 저장하게 되면 키, 값 형태로만 저장하면 되니 원하는 형태로 데이터를 넣을 수 있는 장점이 있습니다.(json..)
  3. 자연스럽게 컴포넌트간 결합도를 낮춥니다. MSA와 궁합이 잘 맞습니다.

단점

  1. 학습곡선이 필요하며 직관적이지 않습니다.
  2. 상태를 로컬캐시에 저장해두고 있지 않으면 상태를 다시 생성해야 하므로 오히려 일반적인 쿼리를 처리하기엔 까답로워집니다.
  3. 모든 도메인 모델에 적용할 수는 있지만 이벤트 중심의 아키텍처 모델의 더 적합합니다.

6. CQRS with Event Sourcing

CQRS와 Event Sourcing에 대해 간략하게 알아보았으니 이제 두 패턴을 통합해보겠습니다. 아키텍처는 아래 그림과 같습니다.
cqrs-es
달라진게 보이시나요? CQRS패턴에서 쓰기 저장소만 Event Store로 교체하였습니다.

6.1 통합

Event Sourcing과 CQRS를 독립적으로 구현한 경우 이해하기 어렵지 않습니다. CQRS를 적용한 애플리케이션에 Event Sourcing 애플리케이션에서 정의한 이벤트 스토어를 추가해보겠습니다.
우선 우리는 상태를 업데이트 하는 대신 이벤트를 만들도록 Aggregate를 바꾸어야 합니다.

class UserAggregate(
    private val writeRepository: EventStore
) {
    fun handleCreateUserCommand(command: CreateUserCommand): List<UserCreatedEvent> {
        val event = UserCreatedEvent(command.userId, command.firstName, command.lastName)
        writeRepository.addEvent(command.userId , event)
        return listOf(event)
    }

    fun handleUpdateUserCommand(command: UpdateUserCommand): List<Event> {
        val user = UserUtility.recreateUserState(writeRepository, command.userId)
        val events = mutableListOf<Event>()
        val contactsToRemove = user.contacts
            .filter { !command.contact.contains(it) }
        for (contact in contactsToRemove) {
            val userContactRemovedEvent = UserContactRemovedEvent(contact.type, contact.detail)
            events.add(userContactRemovedEvent)
            writeRepository.addEvent(command.userId, userContactRemovedEvent)
        }
        val contactsToAdd = command.contact
            .filter { !user.contacts.contains(it) }
        for (contact in contactsToAdd) {
            val userContactAddedEvent = UserContactAddedEvent(contact.type, contact.detail)
            events.add(userContactAddedEvent)
            writeRepository.addEvent(command.userId, userContactAddedEvent)
        }
        
        // addressToRemove , addressToAdd도 생성
        return events
    }
}

또한 상태를 바꾸던 프로젝터도 변경을 해주어야 합니다.

class UserProjector(val readRepository: UserReadRepository) {

    fun project(userId: String, events: List<Event?>) {
        for (event in events) {
            if (event is UserAddressAddedEvent) apply(userId, event)
            if (event is UserAddressRemovedEvent) apply(userId, event)
            if (event is UserContactAddedEvent) apply(userId, event)
            if (event is UserContactRemovedEvent) apply(userId, event)
        }
    }

    fun apply(userId: String, event: UserAddressAddedEvent) {
        val address = Address(
            event.city, event.state, event.postCode
        )
        val userAddress = 
            readRepository.getUserAddress(userId)
        val addresses = userAddress.addressByRegion[address.state]!!
            
        addresses.add(address)
        userAddress.addressByRegion[address.state] = addresses
            
    }

    fun apply(userId: String?, event: UserAddressRemovedEvent) {
        val address = Address(
            event.city, event.state, event.postCode
        )
        val userAddress = readRepository.getUserAddress(userId!!)
        val addresses = userAddress.addressByRegion[address.state]
        addresses?.remove(address)
        readRepository.addUserAddress(userId, userAddress)
    }

    fun apply(userId: String?, event: UserContactAddedEvent?) {
        // Similarly handle UserContactAddedEvent event
    }

    fun apply(userId: String?, event: UserContactRemovedEvent?) {
        // Similarly handle UserContactRemovedEvent event
    }
}

CQRS와 Event Sourcing을 통합한 모델에서 우리가 해야할 것은 발생하는 모든 도메인 이벤트를 처리(저장)하고 그것을 모든 읽기 도메인 모델에 적용하는 것입니다. 또한 고려해야할 점이 많습니다.

고려사항

  1. 익숙하지 않음
  2. 가파른 학습곡선
  3. 비동기와 최종 정합
  4. 과도한 엔지니어링
  5. 유일성 제약
  6. 도구의 부족

이번 포스팅에서는 CQRS와 Event Soucring에 대해 간략하게 알아보고 구현하며 각각의 장단점을 파악해보았습니다.

참고

https://www.baeldung.com/cqrs-event-sourcing-java
https://www.youtube.com/watch?v=TDhknOIYvw4&ab_channel=SpringCampbyKSUG
https://stackoverflow.com/questions/43433318/cqrs-command-return-values

profile
기술을 통해 비즈니스 프로세스를 최적화하는 백엔드 개발자입니다.

0개의 댓글