스프링 이벤트

appti·2024년 5월 29일
0

분석

목록 보기
25/25

옵저버 패턴

스프링 이벤트는 디자인 패턴 중 옵저버 패턴을 구현한 것입니다.
그러므로 스프링 이벤트를 이해하고 적절히 사용하기 위해서는 옵저버 패턴에 대해서도 알아볼 필요가 있습니다.

설명

옵저버 패턴은 어느 한 객체의 상태가 변경되면 이 객체에 의존하고 있는 모든 객체가 자동으로 알림을 받는 패턴을 의미합니다.

옵저버 패턴의 구성 요소는 Observable, Observer로 나눌 수 있습니다.
Observable은 의존 대상이 되는 피관찰자이며 Subject, Publisher, EventEmitter 등으로 표현됩니다.
Observer는 의존하고 있는 관찰자이며 Subscriber, Consumer, EventListener 등으로 표현됩니다.

Observable은 여러 Observer를 등록할 수 있습니다. 즉, Observable과 Observer는 일대다의 관계를 가집니다.

외부로 인해 Observable의 상태가 변경되면, Observable은 자신에게 등록된 모든 Observer에게 이를 알리게 되며, 이를 notify라고 표현합니다.

notify의 경우 다음과 같은 방식으로 호출됩니다.

  • Observable에서 변경이 발생했을 때, Observable을 변경하는 주체(외부에서 호출하는 메서드)가 notify 호출
  • 사용자가 Observable 변경 후 직접 nofity 호출
    • main 등에서 Observable 변경 후, Observer에 직접 notify 호출

구현 방식

옵저버 패턴의 구현 방식은 다음과 같습니다.

  • 동기 블로킹 옵저버 패턴
    • Observer와 Observable이 동일한 스레드에서 실행됩니다.
    • Observable은 모든 Observer에게 이벤트를 전파할 때 까지 블로킹됩니다.
    • Observer와 Observable의 결합도는 낮추면서, 순차적으로 실행해야 할 경우 사용합니다.
  • 비동기 논블로킹 옵저버 패턴
    • Observer와 Observable이 서로 다른 스레드에서 실행됩니다.
    • Observable이 모든 Observer에게 이벤트를 전파할 때 까지 블로킹되지 않습니다.
    • Observer와 Observable의 결합도를 낮출 뿐만 아니라, 실행 순서를 고려할 필요가 없을 경우 사용합니다.
  • 교차 프로세스 옵저버 패턴
    • 단일 애플리케이션이 아닌 다양한 애플리케이션에서 이벤트를 처리하는 방식입니다.
    • Observer와 Observable의 연관 관계를 완전히 분리할 수 있습니다.
    • 주로 메세지 기반의 방식을 사용합니다.

사용 목적

디자인 패턴의 목표는 코드의 결합도를 낮추기 위함입니다.
옵저버 패턴이 속한 행동 디자인 패턴은 서로 다른 행동을 하는 코드의 결합도를 낮추는 것이 목표입니다.
옵저버 패턴은 Observer의 코드와 Observable의 코드의 결합도를 낮추기 위한 패턴입니다.

그러므로 옵저버 패턴을 적용하고자 한다면, Observer와 Observable의 관계를 잘 파악하고 현재 애플리케이션의 상황이 코드가 복잡해지는 것을 감수하더라도 코드의 결합도를 낮춰 이득을 얻을 수 있을지 심사숙고 해야 합니다.

스프링 이벤트

스프링 이벤트란 애플리케이션 내에서 발생하는 특정 이벤트나 상황을 정의하고, 이에 대한 처리를 지정할 수 있는 기능입니다.

이전에 설명한 것과 같이, 옵저버 패턴을 기반으로 동작합니다.

주로 다음과 같은 경우에 사용됩니다.

  • 여러 도메인에서 공통적으로 사용되는 로직인 경우
  • 서로 다른 코드로 분리될만큼 응집도가 떨어지는 로직인 경우
  • 명확한 로직 처리 흐름이 없는 경우

다음과 같은 경우에는 사용하지 않을 것을 권장합니다.

  • 응집도가 높은 로직인 경우
  • 명확한 로직 처리 흐름이 있는 경우

구성 요소

스프링 이벤트의 핵심 요소는 ApplicationListener와 ApplicationEventPublisher입니다.

ApplicationEventPublisher를 통해 이벤트를 발행하고, 이벤트를 처리하기 위해 ApplicationListener가 특정 이벤트를 구독하는 식으로 동작합니다.

ApplicationEventPublisher

ApplicationEventPublisher는 이벤트를 발행하는 역할을 수행합니다.
스프링 이벤트를 사용하기 위해서는 반드시 ApplicationEventPublisher를 주입받아야 합니다.

ApplicationEventPublisher는 옵저버 패턴에서 Observable, Publisher에 해당합니다.

실제로 주입되는 ApplicationEventPublisher는 ApplicationContext 입니다.

ApplicationListener

ApplicationListener는 특정 이벤트를 구독하고, 이벤트가 발행되면 그에 맞는 로직을 수행합니다.

ApplicationListener는 옵저버 패턴에서 Observer, Subscriber에 해당합니다.

ApplicationEventMulticaster

ApplicationEventMulticaster 이벤트가 발생했을 때 이를 AppliationListener에 알리는(= notify를 수행하는) 주체입니다.

SimpleApplicationEventMulticaster가 기본 구현체이며, ApplicationContext refresh 과정 중 사용되는 ApplicationEventMulticaster 또한 SimpleApplicationEventMulticaster 입니다.

@EventListener

@EventListener는 ApplicationListener 기반의 애플리케이션 이벤트를 메서드 레벨로 등록하기 위한 애노테이션 입니다.

ApplicationListener를 등록하는 다른 방법으로 ApplicationListener 인터페이스를 직접 구현해 등록하는 방법도 있지만, @EventListener가 간편하기 때문에 주로 이 방법을 사용합니다.

@EventListener를 사용하는 경우, BeanFactoryPostProcessor 중 하나인 EventListenerMethodProcessor에 의해 메서드를 별도의 ApplicationListener 구현체로 변환합니다.

동작 방식

@Service
@RequiredArgsConstructor
public class UserService {
    
    private final ApplicationEventPublisher publisher;
    
    public void join(JoinUserRequest request) {
        serviceLogic();
        
        publisher.publishEvent(new JoinUserEvent(user));
    }
}

@Component
public class JoinUserEventListener {

    @EventListener
    public void listenJoinEvent(JoinUserEvent event) {
        eventLogic();
    }
}

이렇게 추가한 스프링 이벤트는 다음과 같이 동작합니다.

  • 멀티 캐스팅 방식으로 동작
  • 동기 블로킹

ApplicationEventMulticaster는 이름 그대로 등록한 모든 ApplicationListener 중 조건이 맞는 모든 ApplicationEventMulticaster에게 이벤트를 전파하기 때문에 멀티 캐스팅 방식으로 동작합니다.


스프링 이벤트의 동작 과정을 그림으로 표현하면 다음과 같습니다.

UserService부터 시작된 동작이 JoinUserEvent.listenJoinEvent()를 호출할 때 까지 동기적이며 순차적으로 실행됩니다.

예외 공통 처리

구성 요소 ApplicationEventMulticaster의 구현체 SimpleApplicationEventMulticaster에는 예외를 중앙에서 처리할 수 있는 ErrorHandler를 지정할 수 있습니다.

이렇게 지정한 ErrorHandler는 EventListener를 호출하는 invokeListener에서 예외를 처리하는데 사용됩니다.

주의사항

트랜잭션 전파

@Service
@RequiredArgsConstructor
public class UserService {
    
    private final ApplicationEventPublisher publisher;
    
    @Transactional
    public void join(JoinUserRequest request) {
        serviceLogic();
        
        publisher.publishEvent(new JoinUserEvent(user));
    }
}

@Component
public class JoinUserEventListener {

    @Transactional
    @EventListener
    public void listenJoinEvent(JoinUserEvent event) {
        eventLogic();
    }
}

옵저버 패턴을 내부적으로 구현한 스프링 이벤트의 특성 상, 완전히 분리되었다고 생각할 수 있습니다.

그래서 이벤트를 발행하는 쪽(UserService)과 발생한 이벤트에 따라 로직을 수행하는 쪽(JoinUserEvent) 양쪽에 @Transactional을 붙이는 경우가 있습니다.

하지만 동기 블로킹 방식으로 동작하는 만큼, 굳이 @Transactional을 붙여주지 않아도 자연스럽게 JoinUserEvent가 UserService의 트랜잭션에 참여하게 됩니다.

@Service
@RequiredArgsConstructor
public class UserService {
    
    private final ApplicationEventPublisher publisher;
    
    @Transactional
    public void join(JoinUserRequest request) {
        serviceLogic();
        
        try {
            publisher.publishEvent(new JoinUserEvent(user));
        } catch (RuntimeException e) {
            // 예외 처리
        }
    }
}

@Component
public class JoinUserEventListener {

    @Transactional
    @EventListener
    public void listenJoinEvent(JoinUserEvent event) {
        eventLogic(); // 예외 발생
    }
}

특히 위와 같이 별도로 @Transactional을 붙인 상태에서 JoinUserEvent에서 예외가 발생한 경우, UserService에서 예외 처리를 하더라도 트랜잭션 전파로 인해 롤백 마킹이 되면서 예외 처리와 무관하게 트랜잭션이 깨지게 됩니다.

이처럼 예상치 못한 롤백이 발생하거나 불필요한 트랜잭션 AOP 과정이 추가되기 때문에 @Transactional을 삭제하거나, 트랜잭션 전파를 고려하며 코드를 작성해야 합니다.

이벤트 로직의 작업 시간이 긴 경우

스프링 이벤트는 동기 블로킹으로 진행되기 때문에, 이벤트 로직의 작업 시간이 긴 경우 작업이 모두 끝날 때 까지 대기해야 합니다.

특히 스프링 이벤트의 경우 알림, 메일 발송과 같이 여러 도메인에서 공통적으로 사용될 수 있으며, 두 영역 간의 결합도가 낮은 만큼 한 쪽의 로직 수행으로 인해 다른 쪽의 로직이 지연되는 것은 바람직하지 않을 것입니다.

이를 해결하기 위해 비동기 논블로킹 방식으로 변경할 수 있습니다.

주로 @Async 애노테이션을 사용하지만, ApplicationEventMulticaster를 직접 구현해 비동기적으로 이벤트 처리를 수행하는 코드를 명시할 수도 있습니다.

어떤 방식으로 비동기 논블로킹을 적용하든, 각 영역은 별도의 스레드에서 수행되기 때문에 이전과 달리 트랜잭션이 물리적으로 분리되는 것을 주의해야 합니다.

@TransactionalEventListener

@TransactionalEventListener는 트랜잭션 이벤트를 처리하기 위한 EventListener 관련 애노테이션입니다.

@EventListener가 구현하는 ApplicationListener를 확장한 TransactionalApplicationListener를 구현하게 됩니다.

구성 요소

@TransactionalEventListener의 경우 TransactionPhase에 따라 이벤트가 트리거됩니다.
그러므로 일반적인 @EventListener에서 사용하는 ApplicationEventMulticaster 뿐만 아니라, 트랜잭션과 관련된 요소들을 사용합니다.

TransactionPhase

TransactionPhase는 @TransactionalEventListener를 트리거하는 트랜잭션 이벤트입니다.

다음과 같은 종류가 있습니다.

  • TransactionPhase.AFTER_COMMIT
    • 기본 값입니다.
    • 참여한 트랜잭션이 커밋되면 이벤트가 트리거됩니다.
  • TransactionPhase.AFTER_ROLLBACK
    • 참여한 트랜잭션이 롤백되면 이벤트가 트리거됩니다.
  • TransactionPhase.AFTER_COMPLETION
    • 참여한 트랜잭션이 완료(COMPLETION)되면 이벤트가 트리거됩니다.
  • TransactionPhase.BEFORE_COMMIT
    • 참여한 트랜잭션이 커밋되기 직전에 이벤트가 트리거됩니다.
      • 데이터베이스에 변경 사항이 모두 반영되었지만, 물리적으로 커밋하기 직전에 이벤트가 트리거됩니다.

AFTER_ROLLBACK의 경우 위와 같이 AFTER_COMPLETION과 동일한 절차를 통해 수행된다고 표시되어 있습니다.

TransactionalApplicationListenerSynchronization.afterTransaction()에서 위 구간이 AFTER_ROLLBACK으로 인해 트리거된다고 볼 수 있습니다.

그러므로 명시적으로 AFTER_COMMIT을 트리거하는 구간은 존재하지 않습니다.

TransactionSynchronizationUtils

TransactionSynchronizationUtils는 트랜잭션 동기화 리소스를 등록하고 관리하는 역할을 수행합니다.

TransactionPhase에 맞춰 다양한 시점에 로직을 트리거하고 있음을 확인할 수 있습니다.

TransactionalApplicationListenerSynchronization

TransactionalApplicationListenerSynchronization는 TransactionPhase에 맞게 동기적으로 TransactionalApplicationListener를 호출하는 역할을 수행합니다.

TransactionSynchronizationUtils에서 호출하는 beforeCommit(), afterCompletion()이 정의되어 있습니다.

TransactionSynchronizationUtils에서 공통적으로 호출하는 processEventWithCallbacks()는 등록된 callback과 이벤트를 트리거합니다.

동작 방식

@Service
@RequiredArgsConstructor
public class UserService {
    
    private final ApplicationEventPublisher publisher;
    
    @Transactional
    public void join(JoinUserRequest request) {
        serviceLogic();
        
        publisher.publishEvent(new JoinUserEvent(user));
    }
}

@Component
public class JoinUserEventListener {

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void listenJoinEvent(JoinUserEvent event) {
        eventLogic();
    }
}

@TransactionalEventListener 또한 일반적인 @EventListener와 같이 동작합니다.

  • 멀티 캐스팅 방식으로 동작
  • 동기 블로킹

@EventListener의 경우 ApplicationListener.onApplicationEvent()를 호출하면 바로 이벤트가 트리거되었지만, @TransactionalEventListener의 경우 TransactionPhase에 따라 이벤트를 트리거해야하므로 ApplicationListener.onApplicationEvent()를 호출하더라도 이벤트가 트리거되지 않습니다.

대신 TransactionalApplicationListenerSynchronization를 초기화합니다.
초기화할 때, 이벤트를 트리거하기 위해 필요한 모든 내용을 세팅하게 됩니다.

이후 AbstractPlatformTransactionManager.commit()에서 커밋/롤백에 따라 TransactionPhase를 트리거하게 됩니다.

커밋/롤백에 따라 processCommit/processRollback()이 호출됩니다.

processCommit()의 경우 AFTER_COMMIT / BEFORE_COMMIT / AFTER_COMPLETION를 호출합니다.

processRollback()의 경우 AFTER_COMPLETION를 호출합니다.

triggerBeforeCompletion()의 경우 ResourceHolderSynchronization, DataSourceUtils, SpringSessionSynchronization과 같이 스프링 이벤트와 무관하게, 주로 DB 관련 리소스를 해제하는데 사용됩니다.

TransactionSynchronizationAdapter에서는 NO-OP 형식으로 오버라이딩 된 것을 확인할 수 있습니다.

예외 공통 처리

TransactionSynchronizationUtils에서 @TransactionalEventListener를 트리거할 때, try-catch로 예외를 단순 로그 출력만으로 처리하고 있음을 확인할 수 있습니다.

그렇기 때문에 이벤트를 트리거하는 TransactionalApplicationListenerSynchronization에서 해당 catch문에 의해 수행되는 Callback을 통해 예외를 공통적으로 처리해야 합니다.

하지만 SimpleApplicationEventMulticaster처럼 예외를 처리하기 위한 Callback 등록 API를 지원해주지 않기 때문에 TransactionalApplicationListenerSynchronization을 생성하는 팩토리인 TransactionalEventListenerFactory를 커스터마이징해 별도로 Callback을 등록하고, 이를 빈으로 등록한 뒤, 기존 TransactionalApplicationListenerSynchronization보다 우선 순위를 높게 설정하는 과정이 필요합니다.

public class CustomCallbackTransactionalEventListenerFactory extends TransactionalEventListenerFactory {

    private final List<SynchronizationCallback> callbacks;

    public CustomTransactionalEventListenerFactory(List<SynchronizationCallback> synchronizationCallbacks) {
        super();
        this.callbacks = synchronizationCallbacks;
    }

    @Override
    public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
        ApplicationListener<?> applicationListener = super.createApplicationListener(beanName, type, method);
        
        if (applicationListener instanceof TransactionalApplicationListener<?> listener) {
            callbackList.forEach(listener::addCallback);
        }
        
        return applicationListener;
    }
}

@Bean
public TransactionalEventListenerFactory customCallbackTransactionalEventListenerFactory(List<SynchronizationCallback> callbackList) {
    TransactionalEventListenerFactory factory = new CustomCallbackTransactionalEventListenerFactory(callbackList);
    
    factory.setOrder(ORDERED);
    
    return factory;
}

주의사항

TransactionPhase.AFTER_COMMIT 이벤트 로직 호출 시 DB에 반영되지 않음

@TransactionalEventListener는 이벤트를 호출한 메서드의 물리 트랜잭션이 커밋된 이후에 동작합니다.

이 때, 별도 처리가 없다면 이벤트 로직에서는 DB를 변경할 수 없습니다.
물리 트랜잭션은 단 한 번만 커밋이 가능하기 때문입니다.

만약 TransactionPhase.AFTER_COMMIT 시점에 동작하는 이벤트의 로직이 DB를 변경해야 한다면, 트랜잭션 전파 속성을 REQUIRES_NEW로 명시하거나, @Async를 통해 기존 물리 트랜잭션과 별도의 스레드에서 이벤트 로직을 수행하게 해야 합니다.

즉, 기존 비즈니스 로직을 수행하는 물리 트랜잭션과 이벤트 로직을 수행하는 물리 트랜잭션이 필요하다는 의미입니다.

이벤트 전달 시 JPA Entity를 그대로 사용하는 경우

@Service
@RequiredArgsConstructor
public class UserService {
    
    private final ApplicationEventPublisher publisher;
    
    @Transactional
    public void join(JoinUserRequest request) {
        serviceLogic();
        
        publisher.publishEvent(new JoinUserEvent(user));
    }
}

@Component
public class JoinUserEventListener {

    private final UserRepository userRepository;

    @Async
    public void listenJoinEvent(User user) {
        // 오래 걸리는 작업 수행
        user.changeData(data); // 오래 걸리는 작업의 결과를 토대로 변경
        
        userRepository.save(user);
    }
}

이 문제는 다음과 같은 상황에서만 문제가 발생합니다.

  • @Async로 인해 기존 물리 트랜잭션과 별도로 동작하는 경우
  • 기본 물리 트랜잭션에 비해 시간이 오래 걸리는 로직인 경우
  • detach된 엔티티를 사용해 명시적으로 JpaRepository.save()를 호출하는 경우
  • JpaRepository.save() 호출 전에 다른 요청으로 인해 동일한 엔티티의 데이터가 변경된 경우

JpaRepository.save()를 명시적으로 호출하기 이전 동일한 엔티티의 데이터가 변경되었다면 이벤트 트리거 시 전달한 엔티티는 유효하지 않은 데이터가 됩니다.

이 때 오래 걸리는 작업을 수행한 뒤, 그 결과를 엔티티에 저장하고 JpaRepository.save()를 호출하면 merge로 동작하게 되어 모든 필드가 덮어씌워지게 됩니다.

이로 인해 유효한 데이터와 유효하지 않은 데이터가 간헐적으로 섞이게 되는 현상이 발생합니다.

다만 JPA 사용 시 detach된 엔티티를 사용하는 것이 권장되지 않기 때문에 잘 발생하지는 않는 현상입니다.

@EventListener & @TransactionalEventListener

이벤트 트리거 시점

@Service
@RequiredArgsConstructor
public class UserService {
    
    private final ApplicationEventPublisher publisher;
    
    public void join(JoinUserRequest request) {
        serviceLogic();
        
        publisher.publishEvent(new JoinUserEvent(user)); // 호출 즉시 이벤트 트리거
    }
}

@Component
public class JoinUserEventListener {

    @EventListener
    public void listenJoinEvent(JoinUserEvent event) {
        eventLogic();
    }
}

@EventListener는 위와 같이 ApplicationEventPublisher.publishEvent()를 호출하면 이벤트 트리거까지 모두 수행됩니다.

@Service
@RequiredArgsConstructor
public class UserService {
    
    private final ApplicationEventPublisher publisher;
    
    @Transactional
    public void join(JoinUserRequest request) {
        serviceLogic();
        
        publisher.publishEvent(new JoinUserEvent(user)); // 호출 시 TransactionalApplicationListenerSynchronization만 초기화
    }
}

@Component
public class JoinUserEventListener {

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void listenJoinEvent(JoinUserEvent event) {
        eventLogic();
    }
}

@TransactionalEventListener는 트랜잭션 AOP가 모두 끝나고, 물리 트랜잭션을 커밋/롤백하는 과정에 이벤트가 트리거됩니다.

스프링 이벤트 + @Async

스프링 이벤트는 동기 블로킹 방식으로 동작하기 때문에 트랜잭션 전파, 작업 대기 시간 등 신경써야 할 부분이나 문제점들이 많습니다.

이런 것들을 해결하기 위한 대표적인 방법은 @Async를 활용한 비동기 처리로 변환하는 것입니다.

다음과 같은 특징을 가집니다.

  • @Async를 사용할 경우 스프링에서 제공해주는 비동기 동작 방식으로 동작하게 됩니다.
    • 이전에 살펴봤던 스프링 이벤트 동작 방식이 아닌, 비동기 동작 방식으로 동작합니다.
  • AsyncUncaughtExceptionHandler를 활용해 예외 공통 처리가 가능합니다.
  • 호출자와 별도의 스레드로 동작합니다.
    • 트랜잭션이 물리적으로 분리됩니다.

@Async를 활용해 스프링 이벤트를 비동기 논블로킹으로 사용하는 경우, 스프링 이벤트 취지에 맞게 코드가 Observer와 Observable로 분리할 수 있는지, 분리해서 얻게 되는 코드의 복잡함 이상으로 이득을 얻을 수 있는지를 고려해야 합니다.

또한 @Async 위주로 동작하므로, 동작 과정과 관련된 주의 사항은 스프링 이벤트와 관련된 내용보다는 @Async 동작 과정에 초점을 맞춰 개발을 진행해야 합니다.

profile
안녕하세요

0개의 댓글