알림 기능을 구현하기 위해 SSE(Server Sent Event)를 공부하던 와중 스프링에서는 이벤트라는 개념을 다루기 위해 어떤 지원하는 기능이 있는 지 궁금하여 찾아보게 되었다.
@RestController
@RequiredArgsConstructor
public class NotificationController {
private final UserQueryService userQueryService;
private final NotificationService notificationService;
@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId,
@RequestParam("userId") Long userId) {
return notificationService.subscribe(userId, lastEventId);
}
@PostMapping("/notification/{receiverId}")
public ResponseEntity<Void> notification(@PathVariable("receiverId") Long receiverId
, @RequestBody CreateNotificationRequest request) {
User receiver = userQueryService.findById(receiverId);
notificationService.send(receiver, request.getNotificationType(),
request.getContent(), request.getRelatedUrl());
return ResponseEntity.ok().build();
}
@PostMapping("/notifications")
public ResponseEntity<Void> notificationToAll(@RequestBody CreateNotificationRequest request) {
List<User> users = userQueryService.findAll();
users.forEach(user -> notificationService
.send(user, request.getNotificationType(),
request.getContent(), request.getRelatedUrl()));
return ResponseEntity.ok().build();
}
}
현재 알림 발행 요청을 처리하는 NotificationController
는 NotficationService
에 의존성을 지니고 있는 형태이다. 이와 같이 이벤트 처리와 관련하여 서비스 계층에 직접적인 의존성을 가지는 형태로 디자인하게 되면 서비스간 의존성이 추가되고 결합도가 높아질 것이다. 더불어 사이드 이펙트가 발생할 가능성도 커지므로 이벤트를 활용하여 서비스에 대한 결합을 끊는 방식으로 개선할 수 있다.
예제 코드는 특정 User
가 받을 User
를 지정하여 알림을 보내는 기능과 특정 User
가 모든 User
에게 알림을 보내는 기능을 구현하고 있다.
스프링의 ApplicationContext
는 이벤트를 활용할 수 있다. ApplicationEventPublisher
인터페이스로 이벤트를 ApplicationContext
에 넘겨주고 이를 Listener
가 받아서 처리하는 구조로 되어있다.
@FunctionalInterface
public interface ApplicationEventPublisher {
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
void publishEvent(Object event);
}
ApplicationEventPublisher
는 ApplicationContext
에 이벤트를 발행해주는 인터페이스로 이 객체에는 이벤트에 필요한 데이터를 저장할 수 있다.
ApplicationContext
에서 이벤트가 발행되면 @EventListener
가 붙은 적합한 메서드를 찾아 실행한다. (적합한 메서드는 event
파라미터의 타입에 따라 탐색한다) publicEvent(Object event)
로 발행했던 event
를 파라미터로 받는 메서드를 모두 실행한다. 이 때 반드시 이벤트를 받을 메서드의 파라미터는 event
만 가지고 있어야 한다.
@RestController
@RequiredArgsConstructor
public class NotificationController {
private final ApplicationEventPublisher eventPublisher;
@PostMapping("/notifications")
public ResponseEntity<Void> notificationToAll(@RequestBody CreateNotificationRequest request) {
eventPublisher.publishEvent(request);
return ResponseEntity.ok().build();
}
}
앞선 코드에서 NotificationService
에 대한 의존성을 제거하고 ApplicationEventPublisher
를 활용하여 특정 유저에게 다른 모든 유저에게 알림을 보내고 싶다는 요청이 올 시 이벤트를 발행하는 방식으로 코드를 리팩터링하였다.
@Component
@RequiredArgsConstructor
public class NotificationListener {
private final NotificationService notificationService;
private final UserQueryService userQueryService;
@EventListener
public void handleNotificationForAll(CreateNotificationRequest event){
List<User> users = userQueryService.findAll();
users.forEach(user -> notificationService.send(
user,
event.getNotificationType(),
event.getContent(),
event.getRelatedUrl()
));
}
}
NotificationController
에서 발행된 이벤트는 NotificationListener
의 handleNotificationForAll
메서드에 의해 처리된다.
이 어노테이션을 사용하면 이벤트 발행 시점을 결정할 수 있다.
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void afterCommit(Event event) {
// 이벤트 발행 주체가 커밋되면 실행 (default)
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void afterRollBack(Event event) {
// 이벤트 발행 주체가 롤백되면 실행
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void afterCompletion(Event event) {
// 이벤트 발행 주체가 끝나면 실행 (롤백, 커밋)
}
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void beforeCommit(Event event){
// 이벤트 발행 주체가 커밋되기 전에 실행
}
phase
를 설정하지 않으면 default로 커밋 후에 실행하게 된다.
만약 이벤트 발행이 포함된 로직과 이벤트 처리 로직이 별도의 트랜잭션에서 서로 영향을 주지 않고 실행되게 하고 싶다면 어떻게 해야할까? 단순히, @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
를 사용해서 이벤트 발행 주체가 커밋하고 이벤트 처리가 실행되도록 하면 되겠다고 생각할 수 있다.
하지만 커밋 후에 이벤트를 발행한 경우 이벤트에서 에러가 발생하더라도 이벤트를 발행한 주체는 이미 커밋이 됐기 떄문에 영향을 주지 못한다. 같은 트랜잭션으로 묶여있는 상황에서 이벤트를 발행하기 전에 커밋이 되어버려 조회는 가능하지만, 쓰기는 불가능하게 된다. 트랜잭션이 사라진 것이 아닌 이벤트가 이미 커밋된 트랜잭션에 참여한 상황이 발생하게 되는 것이다.
@TransactionalEventListener
public void handle(Event event) {
EventHistory eventHistory = EventConverter.toEventHistory(event);
saveEventHistory(eventHistory); // insert 발생
updateUserEventHistory(event.getReceiver(), eventHistory); // update 발생
}
위와 같이 로직이 존재할 때 만약 리스너를 호출하는 서비스 로직이 @Transactional
로 되어 있으면 insert
, update
쿼리를 사용할 수 없게 된다.
트랜잭션 전파 속성을 REQUIRES_NEW
로 바꾸면 트랜잭션을 분리할 수 있다.
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
public void handle(Event event) {
EventHistory eventHistory = EventConverter.toEventHistory(event);
saveEventHistory(eventHistory); // insert 발생
updateUserEventHistory(event.getReceiver(), eventHistory); // update 발생
}
트랜잭션 전파 레벨을 수정하는 것으로 트랜잭션도 분리도 가능하였다. 헌데 만약 이벤트 발행 시 해당 이벤트를 여러번 처리하게 된다면 어떻게 될까?
위 상황은 하나의 스레드에서 실행하게 된다. 따라서 검증, 푸시 알림, 알림 현황 업데이트 중 하나라도 예상치 못한 에러가 발생하면 순서에 따라 서로 영향을 줄 수도 있다. 이 문제는 푸시 알림에서 에러 처리를 해주면 해결할 수 있다.
spring:
datasource:
hikari:
maximum-pool-size: 1
데이터 커넥션 풀사이즈를 1로 설정하고 앞선 로직을 실행하면 커넥션을 얻기 위해 계속 대기하게 된다. 트랜잭션을 분리하게 되면 분리된 트랜잭션은 기존 커넥션과 다른 커넥션으로 연결된다. 따라서 이벤트에 따라 실행되는 로직이 개라면 개의 커넥션으로 연결된다. 이 스레드가 끝나지 않는 이상 다수의 커넥션을 연결되어 있는 상태고 이는 성능에 문제가 발생할 수 있다.
트랜잭션 분리와 더불어 더 확실히 이벤트가 서로 영향을 주지 않으려면 비동기를 사용해야 한다.
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
public void handle(Event event) {
EventHistory eventHistory = EventConverter.toEventHistory(event);
saveEventHistory(eventHistory); // insert 발생
updateUserEventHistory(event.getReceiver(), eventHistory); // update 발생
}
@SpringBootApplication
@EnableAsync
public class SpringAllinoneProjectApplication {
public static void main(String[] args) {
SpringApplication.run(SpringAllinoneProjectApplication.class, args);
}
}
메서드에 @Async
를 붙이고 @SpringBootApplication
이 붙은 main 메서드가 있는 클래스에 @EnableAsync
를 붙여주면 비동기 처리가 가능해진다.
위와 같이 구현하면 매번 에벤트가 발생할 때마다 새 스레드를 생성하게 된다. 스레드를 생성하는 작업도 비용이 들 수 있고 메모리 영역을 차지하기 때문에 비동기 처리를 위한 스레드 풀도 설정하여 보다 효율적으로 사용할 수 있다.
@Configuration
public class AsyncConfig {
@Bean
public TaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor asyncExecutor = new ThreadPoolTaskExecutor();
asyncExecutor.setThreadNamePrefix("async-pool");
asyncExecutor.setCorePoolSize(10);
asyncExecutor.initialize();
return asyncExecutor;
}
}
무조건적으로 비동기를 사용하기 보다 비즈니스 정책을 고려하여 적절히 비동기, 트랜잭션 분리를 하는 것이 바람직한 방향이라 할 수 있다.
https://github.com/Minjae-An/spring-all-in-one/tree/feat/%2313-notification-with-sse