Spring에서 Event를 사용하는 법

Minjae An·2024년 1월 21일
0

Spring Web

목록 보기
7/9

개요

알림 기능을 구현하기 위해 SSE(Server Sent Event)를 공부하던 와중 스프링에서는 이벤트라는 개념을 다루기 위해 어떤 지원하는 기능이 있는 지 궁금하여 찾아보게 되었다.

@RestController
@RequiredArgsConstructor
public class NotificationController {
    private final UserQueryService userQueryService;
    private final NotificationService notificationService;

    @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe(
            @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId,
            @RequestParam("userId") Long userId) {
        return notificationService.subscribe(userId, lastEventId);
    }

    @PostMapping("/notification/{receiverId}")
    public ResponseEntity<Void> notification(@PathVariable("receiverId") Long receiverId
            , @RequestBody CreateNotificationRequest request) {
        User receiver = userQueryService.findById(receiverId);
        notificationService.send(receiver, request.getNotificationType(),
                request.getContent(), request.getRelatedUrl());

        return ResponseEntity.ok().build();
    }

    @PostMapping("/notifications")
    public ResponseEntity<Void> notificationToAll(@RequestBody CreateNotificationRequest request) {
        List<User> users = userQueryService.findAll();
        users.forEach(user -> notificationService
                .send(user, request.getNotificationType(),
                        request.getContent(), request.getRelatedUrl()));

        return ResponseEntity.ok().build();
    }
}

현재 알림 발행 요청을 처리하는 NotificationControllerNotficationService 에 의존성을 지니고 있는 형태이다. 이와 같이 이벤트 처리와 관련하여 서비스 계층에 직접적인 의존성을 가지는 형태로 디자인하게 되면 서비스간 의존성이 추가되고 결합도가 높아질 것이다. 더불어 사이드 이펙트가 발생할 가능성도 커지므로 이벤트를 활용하여 서비스에 대한 결합을 끊는 방식으로 개선할 수 있다.

예제 코드는 특정 User 가 받을 User 를 지정하여 알림을 보내는 기능과 특정 User 가 모든 User 에게 알림을 보내는 기능을 구현하고 있다.

ApplicationEventPublisher

스프링의 ApplicationContext 는 이벤트를 활용할 수 있다. ApplicationEventPublisher 인터페이스로 이벤트를 ApplicationContext 에 넘겨주고 이를 Listener 가 받아서 처리하는 구조로 되어있다.

@FunctionalInterface
public interface ApplicationEventPublisher {

	default void publishEvent(ApplicationEvent event) {
		publishEvent((Object) event);
	}

	void publishEvent(Object event);
}

ApplicationEventPublisherApplicationContext 에 이벤트를 발행해주는 인터페이스로 이 객체에는 이벤트에 필요한 데이터를 저장할 수 있다.

@EventListener

ApplicationContext 에서 이벤트가 발행되면 @EventListener 가 붙은 적합한 메서드를 찾아 실행한다. (적합한 메서드는 event 파라미터의 타입에 따라 탐색한다) publicEvent(Object event) 로 발행했던 event 를 파라미터로 받는 메서드를 모두 실행한다. 이 때 반드시 이벤트를 받을 메서드의 파라미터는 event 만 가지고 있어야 한다.

코드 리팩터링 - @EventListener 활용

@RestController
@RequiredArgsConstructor
public class NotificationController {
    private final ApplicationEventPublisher eventPublisher;

    @PostMapping("/notifications")
    public ResponseEntity<Void> notificationToAll(@RequestBody CreateNotificationRequest request) {
        eventPublisher.publishEvent(request);
        return ResponseEntity.ok().build();
    }
}

앞선 코드에서 NotificationService 에 대한 의존성을 제거하고 ApplicationEventPublisher 를 활용하여 특정 유저에게 다른 모든 유저에게 알림을 보내고 싶다는 요청이 올 시 이벤트를 발행하는 방식으로 코드를 리팩터링하였다.

@Component
@RequiredArgsConstructor
public class NotificationListener {
    private final NotificationService notificationService;
    private final UserQueryService userQueryService;

    @EventListener
    public void handleNotificationForAll(CreateNotificationRequest event){
        List<User> users = userQueryService.findAll();
        users.forEach(user -> notificationService.send(
                user,
                event.getNotificationType(),
                event.getContent(),
                event.getRelatedUrl()
        ));
    }
}

NotificationController 에서 발행된 이벤트는 NotificationListenerhandleNotificationForAll 메서드에 의해 처리된다.

@TransactionalEventListener

이 어노테이션을 사용하면 이벤트 발행 시점을 결정할 수 있다.

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void afterCommit(Event event) {
	// 이벤트 발행 주체가 커밋되면 실행 (default)
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void afterRollBack(Event event) {
	// 이벤트 발행 주체가 롤백되면 실행
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void afterCompletion(Event event) {
	// 이벤트 발행 주체가 끝나면 실행 (롤백, 커밋)
}

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void beforeCommit(Event event){
	// 이벤트 발행 주체가 커밋되기 전에 실행
}

phase 를 설정하지 않으면 default로 커밋 후에 실행하게 된다.

만약 이벤트 발행이 포함된 로직과 이벤트 처리 로직이 별도의 트랜잭션에서 서로 영향을 주지 않고 실행되게 하고 싶다면 어떻게 해야할까? 단순히, @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)를 사용해서 이벤트 발행 주체가 커밋하고 이벤트 처리가 실행되도록 하면 되겠다고 생각할 수 있다.

하지만 커밋 후에 이벤트를 발행한 경우 이벤트에서 에러가 발생하더라도 이벤트를 발행한 주체는 이미 커밋이 됐기 떄문에 영향을 주지 못한다. 같은 트랜잭션으로 묶여있는 상황에서 이벤트를 발행하기 전에 커밋이 되어버려 조회는 가능하지만, 쓰기는 불가능하게 된다. 트랜잭션이 사라진 것이 아닌 이벤트가 이미 커밋된 트랜잭션에 참여한 상황이 발생하게 되는 것이다.

@TransactionalEventListener
public void handle(Event event) {
	EventHistory eventHistory = EventConverter.toEventHistory(event);

	saveEventHistory(eventHistory); // insert 발생

	updateUserEventHistory(event.getReceiver(), eventHistory); // update 발생
}

위와 같이 로직이 존재할 때 만약 리스너를 호출하는 서비스 로직이 @Transactional 로 되어 있으면 insert , update 쿼리를 사용할 수 없게 된다.

**Propagation.REQUIRES_NEW**

트랜잭션 전파 속성을 REQUIRES_NEW 로 바꾸면 트랜잭션을 분리할 수 있다.

@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
public void handle(Event event) {
	EventHistory eventHistory = EventConverter.toEventHistory(event);

	saveEventHistory(eventHistory); // insert 발생

	updateUserEventHistory(event.getReceiver(), eventHistory); // update 발생
}

비동기 적용 - @Async

트랜잭션 전파 레벨을 수정하는 것으로 트랜잭션도 분리도 가능하였다. 헌데 만약 이벤트 발행 시 해당 이벤트를 여러번 처리하게 된다면 어떻게 될까?

  1. 유저 A가 유저 B에게 알림을 보내려는 요청을 검증한다.
  2. 유저 A가 유저 B에게 알림을 보내는 이벤트를 발행한다.
    1. 유저 A가 알림을 보냈다는 푸시 알림을 유저 B에게 보낸다.
    2. 유저 B의 알림 현황을 업데이트한다.

위 상황은 하나의 스레드에서 실행하게 된다. 따라서 검증, 푸시 알림, 알림 현황 업데이트 중 하나라도 예상치 못한 에러가 발생하면 순서에 따라 서로 영향을 줄 수도 있다. 이 문제는 푸시 알림에서 에러 처리를 해주면 해결할 수 있다.

하나의 스레드, 다수의 커넥션

spring:
  datasource:
    hikari:
      maximum-pool-size: 1

데이터 커넥션 풀사이즈를 1로 설정하고 앞선 로직을 실행하면 커넥션을 얻기 위해 계속 대기하게 된다. 트랜잭션을 분리하게 되면 분리된 트랜잭션은 기존 커넥션과 다른 커넥션으로 연결된다. 따라서 이벤트에 따라 실행되는 로직이 nn개라면 nn개의 커넥션으로 연결된다. 이 스레드가 끝나지 않는 이상 다수의 커넥션을 연결되어 있는 상태고 이는 성능에 문제가 발생할 수 있다.

트랜잭션 분리와 더불어 더 확실히 이벤트가 서로 영향을 주지 않으려면 비동기를 사용해야 한다.

@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
public void handle(Event event) {
	EventHistory eventHistory = EventConverter.toEventHistory(event);

	saveEventHistory(eventHistory); // insert 발생

	updateUserEventHistory(event.getReceiver(), eventHistory); // update 발생
}
@SpringBootApplication
@EnableAsync
public class SpringAllinoneProjectApplication {
	public static void main(String[] args) {
		SpringApplication.run(SpringAllinoneProjectApplication.class, args);
	}
}

메서드에 @Async 를 붙이고 @SpringBootApplication 이 붙은 main 메서드가 있는 클래스에 @EnableAsync 를 붙여주면 비동기 처리가 가능해진다.

위와 같이 구현하면 매번 에벤트가 발생할 때마다 새 스레드를 생성하게 된다. 스레드를 생성하는 작업도 비용이 들 수 있고 메모리 영역을 차지하기 때문에 비동기 처리를 위한 스레드 풀도 설정하여 보다 효율적으로 사용할 수 있다.

@Configuration
public class AsyncConfig {

    @Bean
    public TaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor asyncExecutor = new ThreadPoolTaskExecutor();
        asyncExecutor.setThreadNamePrefix("async-pool");
        asyncExecutor.setCorePoolSize(10);
        asyncExecutor.initialize();
        return asyncExecutor;
    }
}

무조건적으로 비동기를 사용하기 보다 비즈니스 정책을 고려하여 적절히 비동기, 트랜잭션 분리를 하는 것이 바람직한 방향이라 할 수 있다.

전체 코드

https://github.com/Minjae-An/spring-all-in-one/tree/feat/%2313-notification-with-sse

참고

profile
내가 쓴 코드가 남의 고통이 되지 않도록 하자

0개의 댓글