Spring Boot + SSE로 알림 구현하기 (2)

dradra·2024년 10월 8일
post-thumbnail

0. 엔티티 설계

@Entity
@Getter
@Setter
@NoArgsConstructor
public class Notification {
	@Id
	@GeneratedValue(strategy = GenerationType.IDENTITY)
	private Long id;

	@OneToOne
	@JoinColumn(name = "user_id", nullable = false)
	private User user;

	private boolean isRead;
	@Column(columnDefinition = "TEXT")
	private String message;
	private LocalDateTime createdAt;
}
  • 알림은 User와 1:1 관계로 매핑하였다.
  • 알림의 읽음 처리를 구현하기 위해 isRead 컬럼을 추가했다.
  • 알림의 메시지를 구성하는 message 컬럼을 추가했다.
  • 쪽지가 생성된 시각인 createdAt 컬럼을 추가했다.

1. API 명세

  • 알림 목록을 조회하는 API, 특정 알림을 삭제하는 API, SSE에 연결하는 API를 구성했다.
  • 이 API 명세는 초안이며, 프로젝트가 진행됨에 따라 추가 혹은 변경될 수 있다.

2. 기초적인 컨트롤러 구성

@RestController
@RequestMapping("/notification")
@RequiredArgsConstructor
public class NotificationController {

	private final NotificationService notificationService;

	@GetMapping(value = "/sse/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	public SseEmitter subscribe(@PathVariable Long userId) {

		return notificationService.subscribe(userId);
	}
}

1. @RestController@RequestMapping

  • @RestControllerSpring MVC에서 RESTful 웹 서비스의 컨트롤러로 사용되며, 모든 메서드의 반환값이 JSON 형식으로 자동 변환된다.
  • @RequestMapping("/notification")은 이 컨트롤러의 모든 엔드포인트가 /notification 경로를 기본으로 가진다는 것을 의미한다. 즉, 이 클래스에서 정의된 모든 메서드는 /notification 경로를 기반으로 접근할 수 있다.

2. @RequiredArgsConstructor

  • Lombok의 @RequiredArgsConstructor를 사용하여 final로 선언된 필드의 생성자를 자동으로 만들어준다. 여기서는 NotificationService에 대한 의존성을 주입받기 위해 사용되었다.

3. subscribe 메서드

  • @GetMapping(value = "/sse/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

    • /notification/sse/{userId} 경로로 GET 요청을 받을 때 호출된다.

    • produces = MediaType.TEXT_EVENT_STREAM_VALUE를 통해 서버에서 이벤트 스트림 형태로 데이터를 클라이언트에게 전송한다.

    • 특정 userId를 가진 사용자를 위한 SSE 구독을 설정하는 메서드이다.

    • @PathVariable 어노테이션을 통해 URL 경로에서 {userId}를 받아와 해당 값을 userId 매개변수로 사용한다.

    • notificationService.subscribe(userId)를 호출하여 SseEmitter 객체를 반환한다.

SseEmitter란?

  • SseEmitter는 Spring에서 SSE를 지원하기 위해 제공하는 클래스이다.
  • 이 객체를 통해 서버는 클라이언트에게 비동기적으로 데이터를 전송할 수 있다.
  • 클라이언트는 이 연결을 통해 알림을 받을 수 있으며, 페이지를 새로고침하지 않아도 서버로부터 데이터를 수신하게 된다.

3. SSE 연결을 GET 메서드로 설정하는 이유

(1) SSE의 본질적인 특징과 HTTP 메서드의 관계

  • SSE(Server-Sent Events)는 클라이언트가 서버로부터 일방향으로 데이터를 받는 방식이다.
  • 즉, 클라이언트는 서버로부터 연속적으로 데이터를 구독하기 위해 SSE 연결을 설정하고, 이 연결이 유지되는 동안 서버에서 데이터를 보낸다.
  • 클라이언트는 단지 서버로부터 데이터를 수신하는 것에 집중한다.
    • GET 메서드는 HTTP에서 리소스를 요청하고 그 결과를 받기 위해 사용된다. 클라이언트가 서버에 데이터를 보내지 않고, 서버로부터 정보를 받는 역할인 것이다. SSE의 본질이 서버에서 데이터를 받는 역할을 하기 때문에, GET 메서드가 자연스럽다.

(2) RESTful API와 일관성

  • RESTful API 관점에서 봤을 때, GET 메서드는 리소스를 조회할 때 사용된다. 클라이언트가 서버에 알림 스트림을 구독하는 것도 결국 일종의 리소스 조회로 볼 수 있다. SSE에서는 클라이언트가 서버로부터 지속적인 데이터를 "받는" 요청이므로, GET이 가장 적합하다.
  • 만약 POSTPUT을 사용한다면, 일반적으로 서버에 데이터를 전송하거나 리소스를 변경하는 의미가 있는데, SSE는 이런 목적이 아니므로, GET을 사용해야 한다.

(3) HTTP 표준 준수

  • HTTP에서 데이터를 구독하거나 계속해서 정보를 요청하는 것은 GET 요청을 통해 이루어지도록 권장된다. POST, PUT 같은 메서드는 주로 데이터를 생성하거나 업데이트할 때 사용되지, 단순한 데이터를 구독하는 데는 적합하지 않다.

4. SseEmitter를 return하는 이유

JSON 형태의 데이터를 return하는 것이 아닌, "왜 SseEmitter 객체를 리턴할까?"에 대한 의문이 생겼다.

  • 결론부터 말하자면 SseEmitter를 클라이언트에게 반환하는 이유는 SSE 연결을 지속적으로 유지하기 위해서이다.
  • 클라이언트가 서버에 SSE 요청을 보내면, 서버는 이 SseEmitter 객체를 사용해서 클라이언트와의 연결을 지속적으로 열어두고 클라이언트에게 실시간 데이터를 보낼 수 있는 파이프라인을 형성하는 것이다.

SseEmitter를 반환하지 않으면?

  • 클라이언트는 서버로부터 실시간 데이터를 받을 수 없고, 연결이 바로 종료되어버린다.
  • SseEmitter는 서버가 이벤트를 클라이언트로 보낼 수 있는 수단이다. 클라이언트는 이를 통해 실시간으로 알림을 받을 수 있게 된다.
  • 만약 SseEmitter를 반환하지 않으면, 서버와 클라이언트 사이의 실시간 통신이 불가능해져서 SSE의 목적을 달성할 수 없게 된다.

5. SseEmitter 관리는 어떻게 하는가?

SseEmitter는 여러 클라이언트가 동시에 연결될 수 있기 때문에 유저별로 Emitter를 잘 저장하고 관리해야 한다.
각 유저가 알림을 구독하고 있을 때, 해당 유저에게만 알림을 보내야 하는데, 그것을 위해서 각 유저의 SseEmitterMap 형태로 저장하고, 필요할 때마다 꺼내서 이벤트를 보내는 방식으로 관리한다.

유저별 SseEmitter 저장

  • 각 유저에게 할당된 SseEmitterConcurrentHashMap 같은 자료구조에 저장하고, 나중에 알림을 보낼 때 해당 유저의 Emitter에 알림을 전송한다.
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Service
public class NotificationService {
    private final ConcurrentHashMap<Long, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
    public SseEmitter subscribe(Long userId) {
        SseEmitter emitter = new SseEmitter();
                sseEmitters.put(userId, emitter);

        try {
            emitter.send(SseEmitter.event().name("INIT").data("SSE 연결됨"));
        } catch (IOException e) {
            emitter.completeWithError(e);
        }

        emitter.onCompletion(() -> sseEmitters.remove(userId));
        emitter.onTimeout(() -> sseEmitters.remove(userId));

        return emitter;
    }
}
  1. ConcurrentHashMap<Long, SseEmitter>를 사용해서 유저 ID별로 SseEmitter를 저장한다.
  2. 클라이언트가 SSE 요청을 보낼 때마다 새로운 Emitter를 생성해서 Map에 저장하고, 나중에 알림을 보낼 때 이 맵에서 유저의 Emitter를 찾아서 이벤트를 전송할 수 있다.
  3. 또한, Emitter 연결이 완료되거나 타임아웃이 발생하면 해당 Emitter를 Map에서 삭제해서 메모리 누수를 방지할 수 있다.

(1) ConcurrentHashMap을 사용하여 SSE 연결을 관리하는 이유

  • ConcurrentHashMap멀티스레드 환경에서 안전하게 데이터를 처리할 수 있는 동시성(Concurrency) 지원 자료구조다.
  • SSE를 사용할 때는 여러 클라이언트가 동시에 서버에 연결해서 알림을 받을 수 있다.
  • 이때, 여러 스레드가 동시에 SseEmitter를 추가하거나 삭제하는 상황이 발생할 수 있는데, 이 상황을 안전하게 처리하기 위해 ConcurrentHashMap을 사용하는 것이다.

이유를 자세히 설명하자면,

멀티스레드 환경에서는 여러 사용자가 동시에 알림을 구독할 때, 각각의 유저는 다른 스레드에서 처리될 수 있다. 이 때, 스레드 간의 데이터 충돌이나 동기화 문제를 막기 위해 ConcurrentHashMap이 필요하다.


기존의 HashMap은 멀티스레드 환경에서 Thread-Safe하지 않기 때문에, 동시 접근시 데이터 손실이나 충돌과 같은 문제가 발생할 수 있다. ConcurrentHashMap내부적으로 Segment Locking하는 방식으로 동시성을 보장하면서도 성능을 유지한다.


뿐만 아니라, 알림을 실시간으로 처리하는 상황에서는 SseEmitter클라이언트와 연결이 끊기면 바로 삭제되어야 하고, 새로운 연결이 생기면 즉시 추가되어야 한다. ConcurrentHashMap은 이러한 빈번한 삽입/삭제 작업을 빠르고 안전하게 처리할 수 있다.

(2) 대안이 있을까?

  1. HashMap + synchronized:

    • HashMap을 사용하고 synchronized 블록을 사용해서 동시성을 보장할 수도 있다.
    • 하지만 이 방식은 성능 저하가 발생할 가능성이 높다. HashMap 전체에 락을 걸기 때문에 여러 스레드가 동시에 작업할 수 없고, 효율성이 떨어진다.
  2. ConcurrentSkipListMap:

    • ConcurrentSkipListMapConcurrentHashMap과 비슷하게 동시성을 보장하지만, 정렬된 키 값을 유지하는 특징이 있다.
    • 하지만, 여기서는 정렬이 필요하지 않고, 추가적인 오버헤드가 발생할 수 있어 적합하지 않다.
  3. CopyOnWriteArrayList:

    • CopyOnWriteArrayList읽기 작업이 훨씬 많고, 쓰기 작업이 적은 상황에서 유리한 자료구조이다.
    • 하지만 여기선 알림 전송 과정에서 쓰기 작업(Emitter 추가/삭제)가 빈번하게 일어날 수 있기 때문에 성능이 떨어질 수 있다.

(3) Spring에서 추천하는 방식인가?

Spring 자체에서 공식적으로 ConcurrentHashMap을 쓰라고 명시적으로 추천하는 것은 아니지만, 하지만 멀티스레드 환경에서 동시성이 보장된 자료구조를 사용하는 것은 권장되는 방식이다.
특히 SSE 같은 실시간 알림 기능에서는 여러 클라이언트가 동시에 연결되고 해제되기 때문에, 이런 동시성 이슈를 처리하기 위한 방안을 반드시 마련해야 한다.

  • Spring Security에서도 유사한 동시성 문제를 해결할 때, 내부적으로 ConcurrentHashMap을 사용하는 경우가 있다.

6. 추가로 해야하는 작업들 (다음 편에 계속...)

알림을 발생시키고, 그 데이터를 처리해서 클라이언트로 전송하는 과정이 필요하다. 여기서 해야 할 일은 크게 두 가지인데,

  1. 알림 save: 새로운 알림이 발생하면 DB에 알림 데이터를 저장해야 한다. 예를 들어, 누군가가 메시지를 보냈다거나 새로운 이벤트가 발생했을 때, 그걸 기록하는 것이다.

    • 알림이 발생하면 Notification 엔티티에 저장하고, 클라이언트가 알림을 받으면 isRead 같은 플래그를 업데이트할 수 있다.
  2. 알림 전송: 알림이 발생하면 해당 유저에게 SseEmitter를 통해 실시간으로 알림을 보내야 한다.

0개의 댓글