[마이크로서비스 패턴] 6. 비즈니스 로직 개발: 이벤트 소싱

DaeHoon·2023년 11월 14일
0

6.1 이벤트 소싱 응용 비즈니스 로직 개발

  • 이벤트 소싱: 상태 변화를 나타내는 일련의 도메인 이벤트로 애그리거트를 저장한다.

6.1.1 기존 영속화의 문제점

  • 클래스는 DB 테이블에 클래스 필드는 테이블 컬럼에, 클래스 인스턴스는 테이블 각 로우에 매핑하는 것이 기존 영속화 방식
  • 일반적으로 JPA 같은 ORM 프레임워크나 마이바티스 등 저수준 프레임워크를 사용하여 주문 인스턴스를 ORDERORDER_LINE_ITEM 테이블의 로우 단위로 저장한다.

단점 및 한계

  • 객체-관계 임피던스 부정합
    • 객체 모델과 관계형 모델이 서로 잘 어울리지 않는다는 것을 말하는 공상적인 이야기
  • 애그리거트 이력이 없다
  • 감사 로깅은 구현하기 힘들고 오류도 자주 발생한다
  • 이벤트 발행 로직이 비즈니스 로직에 추가된다
    • 데이터를 업데이트하는 트랜잭션의 일부로 메시지를 자동 발행하는 기능이 없음. -> 이벤트 소싱으로 해결

6.1.2 이벤트 소싱 개요

  • 이벤트 소싱은 이벤트를 위주로 비즈니스 로직을 구현하고, 애그리거트를 DB의 일련의 이벤트로 저장하는 기법

이벤트를 이용하여 애그리거트 저장

  • 기존 영속화: 애그리거트를 테이블, 필드를 컬럼에, 인스턴스를 로우에 매핑
  • 이벤트 소싱: 도메인 이벤트 개념에 기반한 새로운 방식. 즉 애그리거트 DB에 있는 이벤트 저장소에 일련의 이벤트로 저장

  • 애그리거트 생성/수정 시 애플리케이션은 애그리거트가 발생시킨 이벤트를 EVENTS 테이블에 삽입
  • 애그리거트를 로드할 떄 이벤트 저장소에서 이벤트를 가져와 재연을 하는데, 구체적으로 3단계로 구성이 된다.
    • 애그리거트의 이벤트를 로드한다.
    • 기본 생성자를 호출하여 애그리거트 인스턴스를 생성한다.
    • 이벤트를 하나씩 순회하여 apply()를 호출한다.

이벤트는 곧 상태 변화

  • 도메인 이벤트는 애그리거트의 변경을 구독자에게 알리는 장치, 이벤트는 애그리거트 ID 같은 최소한의 필수 데이터만 넣거나 컨슈머에 유용한 데이터까지 포함시켜 강화할 수 있다.
    • 주문 서비스가 주문 생성 시 발행하는 OrderCreatedEvent에는 orderId만 넣어도 되고, 이 이벤트를 받는 컨슈머가 따로 주문 서비스 데이터를 조회할 필요가 없도록 주문 정보를 몽땅 포함시킬 수도 있다.
  • 어떤 이벤트에 무엇을 전달할지는 컨슈머의 필요에 따라 좌우되지만, 이벤트 소싱에서는 주로 애그리거트에 의해 이벤트 및 그 구조가 결정된다.
  • 이벤트 소싱에서는 이벤트가 필수. 생성을 비롯한 모든 애그리거트의 상태 변화를 도메인 이벤트로 나타내며, 상태가 바뀔 때마다 반드시 이벤트를 발행시킨다.
    • Order 애그리거트는 주문 생성 시 OrderCreatedEvent를, 그 이후로는 업데이트 될 때 마다 OrderEvent를 발생시킨다.
  • 이벤트는 애그리거트가 상태 전이를 하기 위해 필요한 데이터를 가지고 있어야 한다. Order.state처럼 객체 필드 값을 바꾸는 정도의 간단한 상태 변화도 있지만, 주문 품목 변경 등 객체의 추가/삭제 작업이 동반되는 상태 변화도 있음.

  • 애그리거트의 현재 상태는 S, 새 상태는 S'다. 상태 변화를 나타내는 이벤트 E에는 Order 상태가 S면 order.apply(E)를 호출하여 Order 상태를 S'로 업데이트할 수 있게 데이터가 들어있어야 한다.
  • OrderShippedEvent 같은 이벤트는 데이터는 거의 없고 상태 전이만 나타냄. apply()는 Order 상태를 SHIPEED로 변경할 뿐이지만 다른 이벤트에는 많은 데이터가 들어있다.
    • OrderCreatedEventapply()가 Order를 비롯하여 주문 품목 및 지불 정보, 배달 정보 등을 초기화하기 위해 필요한 데이터를 모두 갖고 있어야 한다.
    • 애그리거트를 이벤트로 저장하기 때문에 orderId만 이벤트가 존재할 수 없음.

애그리거트 메서드의 관심사는 오직 이벤트

  • 비즈니스 로직은 애그리거트의 업데이트 요청을 애그리거트 루트에 있는 커맨드 메서드를 호출하여 처리.

  • 기존에는 커맨드 메서드가 매개변수를 검증한 후 하나 이상의 애그리거트 필드를 업데이트 했지만, 이벤트 소싱을 사용하면 커맨드 메서드가 반드시 이벤트를 발행시킨다.

  • 이벤트 소싱은 커맨드 메서드 하나를 둘 이상의 메서드로 리팩토링한다.

    • 첫 번째 메서드는 요청을 나타낸 커맨드 객체를 매개변수로 받아 상태를 어떻게 변경해야 할지 결정
    • 두 번째 메서드는 각자 정해진 이벤트 타입을 매개변수로 받아 애그리거트를 업데이트 한다.

  • revise() 메서드는 process()apply()로 대체된다.
    • process()가 매개변수로 받은 ReviseOrder 커맨드 클래스는 매개변수 객체 도입 패턴 (Introcude Parameter Object)에 따라 리팩토링
      • 별 문제 없을 경우 OrderRevisionProposed 이벤트를 반환
        • 주문을 변경할 타이밍을 놓쳤거나 최소 주문량 요건에 맞지 않을 경우 예외를 던진다.
    • OrderRevisionProposed에 대응되는 apply()는 주문 상태를 REVISION_PENDING으로 변경한다.
  • 애그리거트는 다음 순서대로 생성된다.
    • 기본 생성자로 애그리거트 루트를 초기화
    • process()를 호출하여 새 이벤트를 발생
    • 새 이벤트를 하나씩 순회하면서 apply()를 호출하여 애그리거트를 업데이트
    • 이벤트 저장소에 새 이벤트를 저장한다.
  • 또한 애그리거트는 다음 순서대로 업데이트 된다.
    • 이벤트 저장소에서 애그리거트 이벤트를 로그한다.
    • 기본 생성자로 애그리거트 루트를 초기화한다.
    • 가져온 이벤트를 하나씩 순회하며 애그리거트 루트의 apply()를 호출한다.
    • process()를 호출하여 새 이벤트를 발생시킨다.
    • 새 이벤트를 순회하면서 apply()를 호출하여 애그리거트를 업데이트한다.
    • 이벤트 저장소에 새 이벤트를 저장한다.

이벤트 소싱 기반의 Order 애그리거트

// 예제 6-1 Order 애그리거트 필드와 인스턴스 초기화 메서드 (애그리거트 생성)
public class Order {

  private OrderState state;
  private Long consumerId;
  private Long restaurantId;
  private OrderLineItems orderLineItems;
  private DeliveryInformation deliveryInformation;
  private PaymentInformation paymentInformation;
  private Money orderMinimum;


  public Order() {
  }

  public List<Event> process(CreateOrderCommand command) { // 커맨드 검증 후 OrderCreatedEvent 반환
    return events(new OrderCreatedEvent(command.getOrderDetails()));
  }

  public void apply(OrderCreatedEvent event) { // Order 필드를 초기화해서 OrderCreatedEvent 적용
    OrderDetails orderDetails = event.getOrderDetails();
    this.orderLineItems = new OrderLineItems(orderDetails.getLineItems());
    this.orderMinimum = orderDetails.getOrderMinimum();
    this.state = CREATE_PENDING;
  }
  
  • 클래스 필드는 JPA와 비슷하나 애그리거트에 애그리거트 ID를 저장하지 않음.
  • 팩토리 메서드 createOrder() -> process(), apply()로 변경
    • process()CreateOrderCommand를 받아 OrderCreatedEvent를 발생시키고 apply()OrderCreatedEvent를 받아 Order의 각 필드를 초기화한다.
// 예제 6-2 process(), apply() 메서드로 Order 애그리거트를 변경한다.

public class Order {

public List<Event> process(ReviseOrder command) { // 변경 가능한 Order인지, 변경 주문 수량이 최소 주문량 이상인지 확인
  OrderRevision orderRevision = command.getOrderRevision();
  switch (state) {
    case APPROVED:
      LineItemQuantityChange change =
              orderLineItems.lineItemQuantityChange(orderRevision);
      if (change.newOrderTotal.isGreaterThanOrEqual(orderMinimum)) {
        throw new OrderMinimumNotMetException();
      }
      return singletonList(new OrderRevisionProposed(orderRevision,
                            change.currentOrderTotal, change.newOrderTotal));

    default:
      throw new UnsupportedStateTransitionException(state);
  }
}

public void apply(OrderRevisionProposed event) {                    #2
  this.state = REVISION_PENDING;
}

public List<Event> process(ConfirmReviseOrder command) {            #3
  OrderRevision orderRevision = command.getOrderRevision();
  switch (state) {
    case REVISION_PENDING:
      LineItemQuantityChange licd =
            orderLineItems.lineItemQuantityChange(orderRevision);
      return singletonList(new OrderRevised(orderRevision,
              licd.currentOrderTotal, licd.newOrderTotal));
    default:
      throw new UnsupportedStateTransitionException(state);
  }
}


public void apply(OrderRevised event) {                            #4
  OrderRevision orderRevision = event.getOrderRevision();
  if (!orderRevision.getRevisedLineItemQuantities().isEmpty()) {
    orderLineItems.updateLineItems(orderRevision);
  }
  this.state = APPROVED;
}
  • revise(), comfirmRevision(), rejectRevision() 세 메서드는 이벤트 소싱 버전에서 process(), apply()로 바뀜.
  • revise() -> process(ReviseOrder), apply(OrderRevisionProposed)
  • confirmRevision() -> process(ConfirmReviseOrder), apply(OrderRevised)

6.1.3 동시 업데이트: 낙관적 잠금

UPDATE AGGREGATE ROOT_TABLE
SET VERSION = VERSION + 1 ...
WHERE VERSION = <원본 버전>
  • 기존 영속화 메커니즘: 한 트랜잭션이 다른 트랜잭션의 변경을 덮어 쓰지 못하게 낙관적 잠금(버전 컬럼을 이용하여 마지막으로 애그리거트를 읽은 이후 변경되었는지 감지)을 하여 처리
  • 애그리거트 루트를 VERSION 컬럼이 있는 테이블에 매핑하고 애그리거트가 업데이트 될 때마다 UPDATE 문으로 값을 하나씩 증가
  • 애플리케이션이 애그리거트를 읽는 시점에 버전이 바뀌지 않았다면 UPDATE문은 성공. 그러나 두 트랜잭션이 같은 애그리거트를 읽는다면 첫 번째 트랜잭션은 성공적으로 애그리거트를 업데이트하고, 두 번째 트랜잭션은 그 사이 버전 번호가 바뀌었으니 실패한다.

6.1.4 이벤트 소싱과 이벤트 발행

이벤트 발행: 폴링

폴링: 하나의 장치(또는 프로그램)이 충돌 회피 또는 동기화 처리 등을 목적으로 다른 장치(또는 프로그램)의 상태를 주기적으로 검사하여 일정한 조건을 만족할 때 송수신 등의 자료처리를 하는 방식

  • 이벤트를 EVENT 테이블에 저장한다고 가정하면 이벤트 발행기는 SELECT문으로 새 이벤트를 계속 폴링하면서 메시지 브로커에 발행한다.
  • 만약 EVENT_ID가 단순히 1만큼 증가하면, 이벤트 발행기가 자신이 처리한 마지막 EVENT_ID를 기록하면 될 것 같다. 새 이벤트는 SELECT * FROM EVENTS WHERE EVENTS_ID > ? ORDER BY EVENT_ID ASC 쿼리로 가져온다. 그러나 트랜잭션이 이벤트를 발행시키는 순서와 다르게 커밋할 수 있다는 점이 문제다.

  • 트랜잭션 A가 EVENT_ID = 1010 이벤트를 삽입하면, 그 다음 트랜잭션 B가 EVENT_ID = 1020 이벤트를 삽입 후 커밋한다.
    • 이 시점에 이벤트 발행기가 EVENT 테이블을 쿼리하면 1020 이벤트가 조회된다. 만약 트랜잭션 A가 커밋된 후 1010이 보이기 시작해도 이미 버스는 떠난 후라 이벤트 발행기는 1010을 무시한다.
  • 이러한 문제를 해결하는 방법은 EVENT 테이블에 이벤트 발행 여부를 추적할 수 있는 컬럼을 추가하는 것이다.
    • SELECT * FROM EVENTS WHERE PUBLISHED = 0 ORDER BY EVENT_ID ASC 쿼리로 미발행 이벤트를 검색
    • 메시지 브로커에 이벤트를 발행
    • UPDATE EVENTS SET PUBLISHED = 1 WHERE EVENT_ID = ? 이벤트가 발행된 것으로 표시

이벤트 발행: 트랜잭션 로그 테일링

6.1.5 스냅샷으로 성능 개선

  • Order 애그리거트는 상태 전이가 별로 없는 편이라 이벤트가 많지 않다. 이러한 이벤트들은 DB를 쿼리해서 Order 애그리거트를 재구성하는 것이 효율적
  • 반면에 Account 애그리거트처럼 수명이 긴 애그리거트는 이벤트 수가 꽤 많아서 일일이 로드/폴드하기 쉽지 않다. 그래서 주기적으로 애그리거트 상태의 스냅샷을 저장한다.

  • 스냅샷 버전이 N이면 N+1 이후에 발생한 이벤트 2개만 가져오면 애그리거트 상태를 되살릴 수 있다. 그 이전 이벤트 N개는 이벤트 저장소에서 가져올 필요가 ㅇ벗다.
  • 스냅샷에서 애그리거트 상태를 복원하려면, 스냅샷으로부터 애그리거트 인스턴스를 생성한 후 이벤트를 하나씩 순회하며 적용한다.

6.1.6 멱등한 메시지 처리

  • 메시지 브로커가 동일한 메시지를 여러 번 전송할 가능성이 있으므로 메시지 컨슈머는 멱등하게 개발해야 한다.
  • 비즈니스 로직이 애그리거트를 생성/수정하는 로컬 ACID 트랜잭션의 일부로 처리한 메시지 ID를 PROCESSED_MESSAGES 테이블에 기록한다. 이 테이블에 메시지 ID가 있으면 중복 메시지

RDBMS 이벤트 저장소 사용

  • 메시지 ID는 PROCESSED_MESSAGES 테이블에, 이벤트는 EVENTS 테이블에 삽입하는 트랜잭션의 일부로 삽입하면 된다.

NoSQL 이벤트 저장소 사용

  • 메시지 컨슈머는 이벤트를 저장하고 메시지 ID를 기록하는 작업을 원자적으로 처리해야 한다.
    • 메시지 컨슈머가 메시지 처리 도중 생성된 메시지 ID를 저장한다. 해당 메시지 ID가 애그리거트의 이벤트에 있는지 확인하면 중복 메시지 여부를 알 수 있다.
  • 그러나 메시지 처리 결과 아무 이벤트도 생성되지 않을 수도 있다.
    • 이벤트가 없다는 것은 메시지 처리 기록 또한 전무하다는 뜻이고, 이후에 같은 메시지를 재전달/재처리하면 이상하게 동작할 수 있다.
      • 메시지 A는 처리되나, 애그리거트는 업데이트 되지 않는다.
        • 메시지 B가 처리되고 메시지 컨슈머는 애그리거트를 업데이트한다.
        • 메시지 A가 재전달되고 처리 기록이 없기 때문에 메시지 컨슈머는 애그리거트를 업데이트 한다.
        • 메시지 B는 다시 처리되고....
  • 해결 방법은 항상 이벤트를 발행하는 것이다. 애그리거트가 이벤트를 발생시키지 않을 경우, 오직 메시지 ID를 기록할 목적으로 가짜 이벤트를 저장하는 것이다. 물론 이러한 이벤트는 이벤트 컨슈머가 무시해야 한다.

6.1.7 도메인 이벤트 발전시키기

  • 이벤트 소싱은 적어도 개념적으로는 이벤트는 영구 저장하지만 이는 양날의 검이다.
  • 정확성을 담보로 변경 감사 로그를 제공하여 애플리케이션이 애그리거트의 상태를 온전히 재구성할 수 있는 반면, 이벤트 구조는 시간이 흐름에 따라 계속 달라지기 때문에 새로운 문제가 생기기 마련이다.

이벤트 스키마

  • 이벤트 소싱에 기반한 애플리케이션의 스키마는 아래의 세 가지로 구성된다.
    • 하나 이상의 애그리거트로 구성
    • 각 애그리거트가 발생시키는 이벤트를 정의
    • 이벤트 구조를 정의

  • 표에 열거한 변경은 서비스 도메인이 점점 발전하면서 자연스럽게 일어남.
    • 서비스 요건이 바뀔수도 있고 담당 개발자가 도메인 모델을 개선시킬 수도 있음

업캐스팅을 통한 스키마 변환 관리

  • DB 스키마 변경은 보통 마이그레이션을 이용하여 처리. 즉, 변경된 새 스키마에 기존 데이터를 옮겨 담는 SQL 스크립트를 실행해서 옮김. 스키마 마이그레이션은 버전 관리 시스템에 저장하고 플라이웨이 등의 툴을 써서 DB에 반영한다.
  • 이벤트 소싱 애플리케이션도 하위 호환이 안 되는 변경을 비슷한 방법으로 처리할 수 있지만, 이벤트 소싱 프레임워크가 이벤트 저장소에서 이벤트를 로드할 때 바꾸어 준다. 보통 업캐스터라고 하는 컴포넌트가 개별 이벤트를 구 버전에서 신 버전으로 업데이트하므로 애플리케이션 코드는 현재 이벤트 스키마를 잘 처리하기만 하면 된다.

6.1.8 이벤트 소싱의 장단점

장점

  • 도메인 이벤트를 확실하게 발행
    • 애그리거트 상태가 변경될 때마다 확실히 이벤트를 발행
  • 애그리거트 이력 보존
    • 애그리거트마다 전체 이력이 그대로 보존되고, 애그리거트 과거 상태를 임시 쿼리로 쉽게 조회할 수 있다. 예를 들어 어떤 고객이 과거 특정 시점에 신용 한도가 얼마였는지 쉽게 계산할 수 있음.
  • O/R 임피던스 불일치 문제를 거의 방지
    • 이벤트 소싱은 이벤트를 취합하는 대신 저장한다. 이벤트는 보통 쉽게 직렬화할 수 있는 단순한 구조. 서비스는 과거 상태를 죽 나열해서 복잡한 애그리거트 스냅샷을 뜰 수 있음. 즉, 애그리거트와 애그리거트를 직렬화한 표현형 사이를 한 수준 더 간접화(indirection)할수 있다.
  • 개발자에게 타임머신 제공

단점

  • 새로운 프로그래밍 모델을 배우려면 시간이 걸린다.
  • 메시징 기반 애플리케이션의 복잡함.
    • 메시지 브로커가 적어도 1회 이상 전달하기 때문에 멱등하지 않은 이벤트 핸들러는 중복 이벤트를 감지해 걸러내야 한다.
  • 이벤트를 발전시키기 어렵다.
    • 이벤트 스키마가 발전이 됨으로써 생기는 문제. 이벤트는 영구저장 되므로 애그리거트는 각 스키마 버전별로 이벤트를 폴드해야하는데, 그러다 보면 버전마다 분기 처리하는 코드로 애그리거트가 도배될 가능성이 있따. 이 문제를 해결하는 방법은 이벤트를 이벤트 저장소에서 가져올 때 최신 버전으로 업그레이드 하는 것(6.1.7). 그러면 애그리거트에 있는 이벤트를 업그레이드하는 코드를 따로 분리할 수 있고, 애그리거트는 최신 버전의 이벤트만 적용하면 된다.
  • 데이터를 삭제하기 어렵다
    • 보통 데이터는 소프트 삭제를 사용한다. 이에 따라서 발생하는 회원의 개인정보 관련 이슈가 발생할 수 있음.
      • 이를 해결하기 위해 개인정보는 암호화를 한 뒤 DB에 저장한다.
  • 이벤트 저장소를 쿼리하기 어렵다.
    • 신용 한도가 소진된 고객을 찾는다고 가정. 신용 한도를 가리키는 컬럼은 따로 없기 때문에 SELECT * FROM CUSTOMER WHERE CREDIT_LIMIT = 0 같은 쿼리는 사용할 수 없다.
    • 그래서 처음에 신용 한도를 설정한 이후 업데이트한 이벤트를 폴드해서 신용 한도를 계산할 수 밖에 없는데, SELECT 문이 중첩된 매우 복잡하고 비효율적인 쿼리를 사용하게 됨. 심지어 NoSQL 이벤트 저장소는 대부분 기본키 검색만 지원하므로 CQRS 방식으로 쿼리를 지원해야 한다.

6.2 이벤트 저장소 구현

  • 이벤트 저장소는 DB와 메시지 브로커를 합한 것. 애그리거트의 이벤트를 기본키로 삽입/조회하는 API가 있어 마치 DB처럼 움직이면서, 이벤트를 구독하는 API도 있어서 메시지 브로커처럼 동작하기도 한다.
  • 이벤트 스토어, 라곰, 액손, 이벤추에이트 등 이벤트 저장소 프레임워크가 존재

6.2.1 이벤추에이트 로컬 이벤트 저장소의 작동 원리

이벤추에이트 로컬의 이벤트 DB 스키마

  • events: 이벤트를 저장
  • entities: 엔티티당 로우 하나
  • snapshots: 스냅샷을 저장
create table events(
	event_id varchar(1000) PRIMARY KEY,
    event_type varchar(1000) NOT NULL,
    event_data varchar(1000) NOT NULL,
    entity_type varchar(1000) NOT NULL,
    entity_id varchar(1000) NOT NULL,
    triggering_event varchar(1000)
);
  • triggering_event는 중복 이벤트/메시지를 발견하는 용도의 컬럼. 이벤트를 생성한 메시지/이벤트의 ID를 여기에 저장한다.
create table entities(
	entity_type varchar(1000),
    entity_id varchar(1000),
    entity_version varchar(1000) NOT NULL,
    PRIMARY KEY(entity_type, entity_id)
);
  • entities 테이블은 엔티티별 현재 버전을 저장. 낙관적 잠금을 구현하는 용도로 쓰인다.
  • 엔티티가 생성되면 이 테이블에 한 로우가 삽입되고, 엔티티가 업데이트될 때마다 entity_version 컬럼도 업데이트 된다.
create table snapshots(
	entity_type varchar(1000),
    entity_id varchar(1000),
    entity_version varchar(1000) NOT NULL,
    snapshot_type varchar(1000) NOT NULL,
    snapshot_json varchar(1000) NOT NULL,
    triggering_event varchar(1000)
    PRIMARY KEY(entity_type, entity_id, entity_version)
);
  • 스냅샷 엔티티는 entity_type과 entity_id 컬럼으로 지정한다. snopshot_json 컬럼은 스냅샷을 직렬화한 표현형이고 snapshot_type은 그 타입을 가리킨다.
  • 이 스키마는 find(), create(), update() 3개의 작업을 지원한다.
    • find(): snapshot 테이블에서 가장 최근 스냅샷을 조회한 후, 스냅샷이 존재하면 events 테이블을 뒤져 event_id가 스냅샷의 entity_version보다 크거나 같은 이벤트를 모두 찾고, 스냅샷이 존재하지 않으면 주어진 엔티티의 이벤트를 모두 조회한다. 또 entity 테이블에서 엔티티 현재 버전을 가져온다.
    • create(): entities 테이블에 새 로우를 삽입하고 events 테이블에는 이벤트를 삽입한다.
    • update(): events 테이블에 이벤트를 삽입한다. 그 다음 UPDATE 문으로 entities 테이블에 있는 엔티티 버전을 업데이트해서 낙관적 잠금 체크를 수행한다.

이벤추에이트 로컬의 이벤트 브로커를 구독하여 이벤트를 소비

  • 서비스는 카프카로 구현된 이벤트 브로커를 구독해서 이벤트를 소비한다. 토픽은 파티셔닝된 메시지 채널이라서, 컨슈머는 메시지 순서를 유지한 상태로 수평 확장을 할 수 있음. 애그리거트 ID를 파티션 키로 사용하기 때문에 애그리거트가 발행한 이벤트 순서가 보존된다. 서비스는 토픽을 구독하여 애그리거트의 이벤트를 소비한다.

이벤추에이트 로컬 이벤트 릴레이가 이벤트를 DB에서 메시지 브로커로 전파

  • 이벤트 릴레이: DB에 삽입된 이벤트를 이벤트 브로커로 전파. 보통 트랜잭션 로그 테일링을 이용하고 다른 DB를 폴링하기도 한다.
    • 예를 들어 마스터/슬레이브 복제 프로토콜을 사용하는 MySQL 버전의 이벤트 릴레이라면 자신이 슬레이브인 것 처럼 MySQL 서버에 접속하여 binlog(MySQL의 업데이트 기록)을 읽는다. EVENTS 테이블에 이벤트가 삽입되면 해당 아파치 카프카 토픽으로 발행된다. 다른 종류의 변경은 이벤트 릴레이가 무시
    • Stand Alone 프로세스로 배포된다. 정확하게 재시작하기 위해 주기적으로 binlog에서 현재 위치(파일명, 오프셋)을 읽어 아파치 카프카 전용 토픽에 저장한다. 시동 시 이벤트 릴레이는 토픽에서 가장 마지막에 기록된 위치를 조회한 후, 해당 위치에서 MySQL binlog를 읽어 들이기 시작한다.

6.2.2 자바용 이벤추에이트 클라이언트 프레임워크

애그리거트 정의: ReflectiveMutableCommandProcessingAggregate 클래스

// 예제 6-3 이벤추에이트 버전의 Order 클래스
public class Order extends ReflectiveMutableCommandProcessingAggregate<Order, OrderCommand>{
	public List<Event> process(CreateOrderCommand command) { ... }
    public void apply(OrderCreatedEvent event) { ... }
    
}
  • 두 타입 매개변수 (애그리거트 구상 클래스, 애그리거트 커맨드 클래스의 상위 클래스)를 받는 제네릭 클래스
  • 리플렉션을 이용하여 커맨드 및 이벤트를 적절한 메서드에 디스패치한다. 커맨드는 process()에, 이벤트는 apply()에 각각 디스패치된다.

애그리거트 커맨드 정의

public interface OrderCommand extends Command{
}

public class CreateOrderCommand implements OrderCommand { ... }
  • 애그리거트 커맨드 클래스는 주어진 애그리거트 의 기초 인터페이스를 상속해야 한다. 그래서 Order 애그리거트 커맨드도 위와 같이 OrderCommand를 상속한다.
  • OrderCommand 인터페이스는 Command 인터페이스를 상속하며, OrderCommand 인터페이스는 CreateOrderCommand 커맨드 클래스가 구현된다.

도메인 이벤트 정의

interface OrderEvent extends Event{
}

public class OrderCreated implements OrderEvent { ... }
  • 애그리거트 이벤트 클래스는 메서드가 하나도 없는 마커 인터페이스 Event를 상속
  • 이벤트 클래스 OrderCreated는 Order 애그리거트의 이벤트 클래스용 기초 인터페이스 OrderEvent를 구현한다. OrderEvent는 다시 Event를 상속한다.

AggregateRepository 클래스로 애그리거트 생성, 검색, 수정

  • AggregateRepository는 애그리거트 클래스, 애그리거트의 기초 커맨드 클래스를 타입 매개변수로 받는 제네릭 클래스. 이 클래스에는 다음 세 메서드가 오버로드 되어있다.
    • save(): 애그리거트를 생성한다. save()는 커맨드 객체를 매개변수로 받아 다음과 같은 일을 한다.
      • 기본 생성자로 애그리거트 인스턴스를 만든다.
        • process()를 호출하여 커맨드를 처리한다.
        • apply()를 호출하여 생성된 이벤트를 적용한다.
        • 생성된 이벤트를 이벤트 저장소에 저장한다.
    • find(): 애그리거트를 검색한다.
    • update(): 애그리거트를 수정한다. 애그리거트 ID, 커맨드를 받아 다음과 같은 일을한다.
      • 이벤트 저장소에서 애그리거트를 조회한다.
      • process()를 호출하여 커맨드를 처리한다.
      • apply()를 호출하여 생성된 이벤트를 적용한다.
      • 생성된 이벤트를 이벤트 저장소에 저장한다.
  • AggregateRepository는 외부 요청에 대한 응답으로 애그리거트를 생성/수정하는 서비스에 주로 쓰인다.
// 예제 6-4 OrderService는 AggregateRepository를 사용한다.
public class OrderService{
	private AggregateRepository<Order, OrderCommand> orderRepository;
    
    public OrderService(AggregateRepository<Order, OrderCommand> orderRepository){
    	this.orderRepository = orderRepository;
    }

	public EntityWithIdAndVersion<Order> createOrder(OrderDetails orderDetails){
    	return orderRepository.save(new CreateOrder(orderDetails));
    }
}
  • Order용 AggregateRepository가 OrderService에 주입되고, createOrder()는 CreateOrder 커맨드를 orderRepository.save()에 넘겨 호출한다.

도메인 이벤트 구독

// 예제 6-5 CreditReservedEvent 핸들러
@EventSubscriber(id="orderServiceEventHandlers")
public class OrderServiceEventHandlers{
	
    @EventHandlerMethod
    public void creditReserved(EventHandlerContext<CreditReserved> ctx{
    	CreditReserved event = ctx.getEvent();
    }
}
  • @EventSubscriber로 이벤트를 처리할 스프링 빈을 저장한다.
  • @EventHandlerMethod는 creditReserved()를 이벤트 핸들러로 식별하는 어노테이션.
  • 이벤트 핸들러는 이벤트 및 관련 메타데이터가 포함된 EventHandelerContext형 매게변수를 받는다.

6.3 사가와 이벤트 소싱을 접목

  • 코레오그래피 사가에서는 이벤트 소싱을 쉽게 접목할 수 있으나, 오케스트레이션 기반의 사가에 접목하는 것은 어려울 수 있다. 이벤트 저장소의 트랜잭션 개념이 상당히 제한적이기 때문이다.
    • 이벤트 저장소를 사용하는 애플리케이션은 애그리거트 하나만 생성/수정하고 결과 이벤트를 발행할 수 있는데, 사가의 각 단계는 다음과 같이 반드시 원자적으로 수행되어야 한다.
      • 사가 생성: 사가를 시작한 서비스는 원자적으로 애그리거트를 생성/수정하고 사가 오케스트레이터를 생성해야 한다. 예를 들어 주문 서비스의 createOrder()Order 애그리거트와 CreateOrderSaga를 생성해야 한다.
      • 사가 오케스트레이션: 사가 오케스트레이터는 원자적으로 응답을 소비하고, 자신의 상태를 업데이트 한 후 커맨드 메시지를 전송해야 한다.
      • 사가 참여자: 주방 서비스, 주문 서비스 등 사가 참여자는 원자적으로 메시지를 소비하고, 중복 메시지를 솎아내고, 애그리거트를 생성/수정하고, 응답 메시지를 전송해야 한다
    • 이처럼 이벤트 저장소의 트랜잭션 능력과 요건 사이에 맞지 않는 부분이 있기 때문에 오케스트레이션 사가와 이벤트 소싱을 연계하는 작업은 어렵다.

6.3.1 코레오그래피 사가 구현: 이벤트 소싱

  • 이벤트 소싱과 코레오그래피 사가는 찰떡궁합. 이벤트 소싱은 메시징 기반의 IPC, 메시지 중복 제거, 원자적 상태 업데이트와 메시지 전송 등 사가가 필요로 하는 여러가지 메커니즘을 제공한다.
  • 하지만 사가 코레오그래피에 이벤트를 사용하면 이벤트의 목적이 이원화가 된다. 이벤트 소싱은 상태 변화를 나타내기 위해 이벤트를 이용하는데, 이벤트를 사가 코레오그래피에 갖다 쓰면 애그리거트는 상태 변화가 없어도 무조건 이벤트를 발생시켜야 한다.
    • 애그리거트를 업데이트하면 비즈니스 규칙에 위배될 경우 애그리거트는 반드시 이벤트를 발생시켜 오류를 보고해야 한다. 더 큰 문제는 사가 참여자가 애그리거트를 생성할 수 없을 경우다. 에러 이벤트를 발생시킬 애그리거트가 하나도 없기 때문이다.
  • 이러한 문제가 있어서 복잡하더라도 오케스트레이션 사가를 구현하는 것이 최선이다.
    • 사가 오케스트레이터는 일부 서비스 메서드에 의해 생성된다. 가령 OrderService.createOrder() 같은 서비스는 애그리거트를 생성/수정하고 사가 오케스트레이터를 생성하는 두 가지 일을 한다.
    • 서비스는 이 두 가지 액션을 첫 번째 액션이 수행되면 두 번째 액션은 최종적으로 실행되는 방식으로 수행한다. 두 액션이 서비스에서 반드시 수행되도록 보장하는 방법은 이벤트 저장소의 종류마다 다르다.

사가 오케스트레이터 작성: RDBMS 이벤트 저장소 사용 서비스

class OrderService

@Autowired
private SagaManager<CreateOrderSagaState> createOrderSagaManager;

@Transactional // createOrder()가 DB 트랜잭션 내에서 실행되도록 한다.
public EntityWithAndVersion<Order> createOrder(OrderDetails orderDetails){
	EntityWithAndVersion<Order> order = orderRepository.save(new CreateOrder(orderDetails)); // Order 애그리거트를 생성하고 process와 apply를 호출하여 이벤트를 처리한다.
    
    CreateOrderSagaState data = new CreateOrderSagaState(order.getId(), orderDetails) // CreateOrderSaga를 생성한다.
    
	createOrderSagaManager.create(data, Order.class, order.getId());
    
    return order;
}
  • 예제 6-4의 OrderService와 4장에서 설명한 OrderService를 조합한 형태. 이벤추에이트 로컬은 RDBMS를 사용하므로 이벤추에이트 트램 사가 프레임워크와 동일한 ACID 트랜잭션에 참여할 수 있다. 하지만 NoSQL 이벤트 저장소를 사용하는 서비스는 사가 오케스트레이터를 생성하는 것이 이렇게 간단하지 않다.

사가 오케스트레이터 작성: NoSQL 이벤트 저장소 사용 서비스

  • NoSQL 이벤트 저장소를 사용하는 서비스는 이벤트 저장소를 업데이트하고 사가 오케스트레이터를 생성하는 액션을 원자적으로 수행하기 어렵다.
  • 그래서 서비스가 애그리거트가 발생시킨 도메인 이벤트에 반응하여 사가 오케스트레이터를 생성하는 이벤트 핸들러를 갖고 있어야 한다.

업로드중..

  • 위의 그림은 주문 서비스가 OrderCreatedEventHandler로 CreateOrderSaga를 생성하는 과정이다.
    • 주문 서비스가 Order 애그리거트를 만들어 이벤트 저장소에 저장하면, 이벤트 저장소는 OrderCreated 이벤트를 발행하고 이벤트 핸들러는 이 이벤트를 소비한다. 이벤트 핸들러는 이벤추에이트 트램 사가 프레임워크를 호출하여 CreateOrderSaga를 생성한다.
  • 사가 오케스트레이터를 생성하는 이벤트 핸들러를 작성할 때 주의점은 중복 이벤트를 처리해야 한다. 적어도 한 번은 메시지를 전달하기 때문에 사가를 생성하는 이벤트 핸들러가 여러 번 호출될 수 있다. -> 즉 멱등성을 구현해야 한다.
    • 가장 쉬운 방법은 이벤트의 유일한 속성에서 사가 ID를 추출하는 것이다. 방법은 두 가지가 있다.
      • 이벤트를 발생시킨 애그리거트 ID를 사가 ID로 쓴다: 애그리거트가 생성 이벤트에 반응하여 생성되는 사가에 적합하다.
        • 이벤트 ID를 사가 ID로 쓴다: 중복 이벤트라면 이벤트 핸들러가 사가를 생성 시도할 때 해당 ID가 이미 존재할테니 실패할 것이다. 동일한 사가 인스턴스가 여럿 존재할 가능성이 있을 때 괜찮은 방법이다.

6.3.3 이벤트 소싱 기반의 사가 참여자 구현

  • 구현하면서 두 가지 이슈를 처리해야 한다.
    • 커맨드 메시지를 멱등하게 처리
    • 응답 메시지를 원자적으로 전송

커맨드 메시지를 멱등하게 처리

  • 바로 위에서 설명함.

응답 메시지를 원자적으로 전송

  • 사가 커맨드 핸들러가 애그리거트를 생성/수정할 때, 애그리거트가 발생시킨 진짜 이벤트와 가짜 이벤트 SagaReplyRequested를 모두 이벤트 저장소에 저장한다.
  • SagaReplyRequested 이벤트 핸들러는 이벤트에 포함된 데이터로 응답 메시지를 만들어 사가 오케스트레이터 응답 채널에 출력한다.

예제: 이벤트 소싱 기반의 사가 참여자

업로드중..

  • 1) 주문 생성 사가가 계좌 인증 커맨드를 매시징 채널을 통해 회계 서비스로 보낸다. 이벤추에이트 사가 프레임워크의 SagaCommandDispather는 AccountingServiceCommandHandelr를 호출하여 커맨드 메시지를 처리한다.
  • 2) AccountingServiceCommandHandelr를는 주어진 Accounting 애그리거트를 커맨드로 전송한다.
  • 3) 애그리거트가 AcountAuthorizedEvent와 SagaReplyRequestedEvent를 발생시킨다
  • 4) SagaReplyRequested 이벤트 핸들러는 주문 생성 사가에 응답 메시지를 전송하여 SagaReply RequestedEvent를 처리한다.
예제 6-6 사가가 전송한 커맨드 메시지를 처리한다.
public class AccountingServiceCommandHandler {

  @Autowired
  private AggregateRepository<Account, AccountCommand> accountRepository;
  
   public void authorize(CommandMessage<AuthorizeCommand> cm) {

    AuthorizeCommand command = cm.getCommand();

    accountRepository.update(Long.toString(command.getConsumerId()),
            makeAuthorizeCommandInternal(command),
            replyingTo(cm)
                    .catching(AccountDisabledException.class, () -> withFailure(new AccountDisabledReply()))
                    .build());

  }
}
  • authorize()는 AggregateRepository를 호출하여 Account 애그리거트를 업데이트 한다. update()에 전달된 세 번째 인수 UpdateOptions는 다음 표현식으로 계산이 된다.
// UpdateOptions
replyingTo(cm)
   .catching(AccountDisabledException.class, 
   () -> withFailure(new AccountDisabledReply()))
   .build());
  • 프레임워크 관련해서 설명한 것으로 보여 자세한 설명은 생략

6.3.4 사가 오케스트레이터 구현: 이벤트 소싱

  • 구현하면서 세 가지의 설계 이슈를 생각해야 한다.
    • 사가 오케스트레이터를 어떻게 저장할 것인가?
    • 어떻게 오케스트레이터 상태를 원자적으로 변경하고 커맨드 메시지를 전송할 것인가?
    • 어떻게 사가 오케스트레이터가 정확히 한 번만 메시지를 응답하게 만들 것인가?

이벤트 소싱으로 사가 오케스트레이터를 저장

  • 사가는 다음 이벤트를 이용하여 저장할 수 있다.
    • SagaOrchestratorCreated: 사가 오케스트레이터가 생성됨.
    • SagaOrchestratorUpdated: 사가 오케스트레이터가 수정됨.

커맨드 메시지를 확실하게 전송

업로드중..

  • 1) 사가 오케스트레이터가 전송하려는 각 커맨드마다 SagaCommandEvent를 발생 시킨다. 여기에는 목적지 채널, 커맨드 객체 등 커맨드 전송에 필요한 데이터가 모두 담겨있고 이벤트 저장소에 저장된다.
  • 2) 이벤트 핸들러는 SagaCommandEvent 처리 후 커맨드 메시지를 목적지 메시지 채널로 보낸다.
  • 이렇게 두 단계로 처리하니 적어도 1회 이상은 커맨드가 전송될 것이고, 동일한 이벤트를 받아 이벤트 핸들러가 호출될 수 있는 구조이기 때문에 SagaCommandEventHandler가 중복된 커맨드 메시지를 전송할 수 있다. 하지만 SagaCommandEvent의 ID를 커맨드 메시지 ID로 사용하면 중복 메시지는 동일한 ID를 가지게 되므로 사가 참여자는 쉽게 중복 메시지를 걸러낼 수 있다.

응답을 꼭 한 번만 처리

  • 사가 오케스트레이터 역시 앞서 설명한 방법으로 중복된 응답 메시지를 솎아 낼 필요가 있다. 오케스트레이터가 응답 메시지 ID (응답을 처리할 때 오케스트레이터가 발생시킬) 이벤트에 보관하면 어느 메시지가 중복인지 쉽게 분간할 수 있다.
profile
평범한 백엔드 개발자

2개의 댓글

comment-user-thumbnail
2023년 11월 14일

즐겁게 읽었습니다. 유용한 정보 감사합니다.

답글 달기
comment-user-thumbnail
2024년 9월 16일

저도 잘읽었습니다. 정리 감사해요

답글 달기