B마트 전시 도메인 CQRS 적용하기
공유 대상
- CQRS 패턴이 생소한 분
- CQRS 구성하게 된 이유가 궁금한 분
- 아키텍처를 어떻게 구성할 지 고민중인 분
- B마트는 어떻게 했는지 궁금한 분
데이터 구조


- ✅ 데이터 관리를 위해 정규화 시킨 데이터와
↔️ 노출을 위한 데이터의 구조는 전혀 다르다!!
특히나 관계형 디비를 사용할 때!
- 전시 도메인에서는 데이터 노출을 위해 정규화 된 데이터를 비정규화 하는 작업이 필요하다.
CQRS
Command and Query Responsibility Segregation : 명령과 조회의 책임을 분리한다.

- 어드민/배치 처럼 명령을 수행하는 부분과, 고객이 조회를 하는 영역은 비즈니스 로직은 나눠지지만 모델은 하나의 모델이 명령/조회 둘 다 수행하는 경우가 많다.

- 점점 서비스가 성장해가면서, 조회에서는 사용되지 않는 내부용 관리 데이터들이 생기게 된다.
- 노출 측면에서는 정책 사항이나, 외부에서 주입받는 데이터(특히, 재고 정보나 배달 정보..)가 생기게 된다.
- 이로인해 도메인(모델)이 점점 더 복잡해지는 상황이 발생한다.
- 서로 영향을 주게되며 리팩토링도 어려워지게 됨

- 명령과 조회 영역을 나눠 문제를 해결하는 것이 CQRS가 바라는 방향이다!
CQRS 적용하기
1단계: 모델 분리하기

- 명령-조회 두 교집합 지점인 모델을 분리한다!

- 조회모델에서는 DTO 사용하는 것을 추천
성능의 한계

- 정규화된 데이터를 조회 모델에서 이용하기 위해서는 다시 비정규화 하는 작업이 필요하게 될텐데,
- 성능을 위해 중간에 캐시를 두게 된다.
생성/저장과 조회 시점을 나눈다

- CQRS 모델에서 조회 모델은 데이터를 가져오는 과정에서 JOIN 연산 같은 작업을 제한해야 한다!
- 가능하면 디비의 값 그대로 이용 가능한 형태로 제공해야 한다. (권장)
- 조회를 위한 새로운 테이블을 설계해야 한다.
(비정규화 된 형태 그대로 저장한다. json 형태가 많기 때문에 NoSQL을 사용하게 되는 것!!!)
- 명령이 적고 많은 조회가 발생하는 시스템에서 큰 이점이 있다!

성능 더 높이기

- 조회쪽에서는 쿼리 성능 높일 수 있는 다양한 데이터 저장소를 사용할 수 있다.
- 단, 데이터 저장소가 나뉘게 되면 두 데이터의 정합성을 보장해야 하고!! 관리의 요소가 늘어난 상태이므로, 시스템 안정이 될때까진 모니터링에 힘써야 한다.
책임과 성능에 대한 부담

- 조회 모델 생성과 저장을 명령 모델쪽에서 책임을 지게 된다..;;;
이벤트 소싱

- 조회 모델 생성과 저장 책임을 떠맡지 않기 위해 이벤트 소싱 패턴을 활용한다!
- 어떤 어플리케이션에서 발행한 이벤트를 이벤트 스토어에 저장하고, 이를 여러 시스템에서 구독해서 다룰 수 있다.
- 이벤트 소싱 패턴을 활용해서 데이터를 관리하는 측과, 조회 모델을 생성하는 측을 분리할 수 있게된다!!
최종 그림

언제 CQRS?
- UX / 비지니스 요구 사항이 복잡해질 때
- 조회 성능을 보다 높이고 싶을떄! 사용자가 많아지거나 트래픽 적 부담이 있을 때
- 데이터 관리하는 영역과, 뷰로 전달하는 영역의 책임이 나뉘어져야 할 때
- 시스템 확장성을 높이고 싶을 때
변경 감지 방법
(추상화 수준은 아래로 갈수록 낮아짐)
1. JPA EntityListeners
2. Hibernate - EventListener ✅
3. Hibernate Interceptor
4. Spring AOP ✅
JPA EntityListeners
(가장 추상화된 레벨, 사용하기 간편)
@Entity 혹은 @MappedSuperclass 객체 메소드에 어노테이션 지정으로 사용 가능
- Callback 지정 함수 선언 가능
(@PrePersist , @PostPersist , @PreUpdate 등 7가지...)
@EntityListeners 어노테이션과 함께 Callback listener Class 구현이 가능
Hibernate EventListener
(Hibernate 5.4v)
- SessionFactoryImpl -> SessionFactoryServiceRegistry -> EventListenerRegistry
- 26가지 디테일한 상황에 콜백받을 수 있다.
- 받고자 하는 상황에 따른 인터페이스를 구현한 클래스를 등록
- 보다 상세한 정보가 전달된다
(변경된 프로퍼티, 이전 상태, 현재 상태 등)
- 모든 엔티티 변경 사항이 전달된다.
Hibernate Interceptor
- Session or SessionFactory 에 인터셉터 등록이 가능하다
- EventListener 에 비해 적은 콜백 종류
- 저장될 데이터 조작 가능
Spring AOP
- 메서드에만 설정 가능
- 메서드 실행 전/후, 반환 후, 예외 상황, 어노테이션 붙은 경우 등.
- Pointcut 문법 동작
- 특정 케이스에만 적용
=> ✔️ 2, 4번 설정을 채택했다.
하이버네이트 이벤트리스너 예제코드
@Component
public class TestPreInsertEventListener implements PreInsertEventListener {
@Autowired
private ExcutorService excutorService;
@Autowired
private DisplayRelatedEntityUpdatedPublisher entityUpdatePublisher;
@Override
public boolean onPreInsert(PreInsertEvent event) {
if (event.getEntity() instanceof DisplayRelatedEntity) {
excutorService.submit(() -> {
entityUpdatePublisher.publishMessage(null);
});
return false;
}
return false;
}
}
- 생성 이전 시점에 콜백을 받을 수 있다.
- return true 인 경우 생성을 거부할 수 있다.
@Component
public class TestEventListenerRegister {
@Autowired
private EntityManageFactory entityManagerFactory;
@PostConstruct
public void init() {
SessionFactory sessionFactory = entityManagerFactory.unwrap(SessionFactory.class);
EventListenerRegistry registry = ((SessionFactoryImpl) sessionFactory)
.getServiceRegistry().getService(EventListenerRegistry.class);
registry.getEventListenerGroup(EventType.PRE_INSERT).appendListener(new TestPreInsertEventListener());
}
}
- 만들어진 이벤트 리스너를 세션 팩토리에 등록한다.
@Around("execution(* org.springframework.data.jpa.repository.JpaRepository+.deleteInBatch(..)))")
public Object process(ProceedingJoinPoint joinPoint) throws Throwable {
try {
Iterable<Object> iterable = (Iterable<Object>) joinPoint.getArgs()[0];
iterable.forEach(it -> {
if (!(it instanceof DisplayRelatedEntity)) {
return;
}
entityUpdatePublisher.publishMessage(null);
});
} cath (Exception e) {
log.warn("Entity Event 발생 중 오류발생했습니다. ({})", e.getMessage(), e);
}
return joinPoint.proceed();
}
- JPA 메소드 중에 DELETE IN BATCH 메소드는 실행 시 바로 쿼리가 수행되어서, 엔티티 리스너가 동작하지 않는다. 때문에 AOP 를 적용하게 되었다.
이벤트 발행하기
- Amazon SNS, Amazon SQS
(spring-cloud-starter-aws-messaging)
- Amazon SNS : 이벤트 발행 시 연결된 여러 서비스에 이벤트를 뿌려주는 역할을 한다. (전파)
Amazon SQS와 연결 설정이 간편하다.
Amazon SQS만으로도 이벤트 발행은 가능하나, 여러개의 큐를 등록 가능하며 확장성을 위해 SNS를 사용하였다.
- Amazon SNS 사용법
PublishRequest publishRequest = new PublishRequest()
.withTopicArn("SNS ARN")
.withMessage("message")
.addMessageAttributesEntry("contentType",
new MessageAttributeValue()
.withDataType("String"));
publishResult = amazonSNSClient.publish(publishRequest);
- Amazon SQS

이벤트 처리하기


- 단건 발행 이벤트를 일일히 DB에 반영하는 비효율 해결을 위해
redis : 변경 대상을 저장 (버퍼 역할로 씀)
scheduler : 데이터 정합성을 보장하기 위해 매시간 마다 데이터를 저장하고 있다.
최종 그림

약 2개월의 개발 소요시간...
약 6개월의 모니터링 및 안정화 작업
아쉬운 점
- 엔티티 변경 감지 부분
Hibernate EventLister + Spring AOP
- Hibernate 의존적
- 쿼리 직접 실행시키는 로직의 예외처리 필요
- 기존 엔티티에 추가 작업 필요
- 이벤트 소싱
- SNS: 계정당 TPS 제한
- 애플리케이션 코드 기반 이벤트 퍼블리싱
- 너무 기본적인 큐 기능만 제공
- Change Data Capture (CDC) 방식을 검토 중...
- CDC기술로는 Kafka Connect 가 있다!
