SSE (Server-Sent Event) - 실시간 알림 기능

박영준·2023년 8월 12일
2

Spring

목록 보기
51/58

1. 비교

1) HTTP 통신

(1) 문제점

댓글/좋아요 등의 알림은 유저의 요청(request)가 없더라도 실시간으로 서버의 변경 사항을 웹 브라우저에 갱신해줘야 하는 서비스다.

그러나 HTTP 통신(POST, GET 요청 등...) 은 클라이언트의 요청(request)가 있어야만, 서버가 응답(response)가 가능하다
(실시간 통신이 아니다)

(2) 보완 방법

HTTP 통신으로도 실시간 통신이 가능한 방법이 2가지 있다.

① Polling

  • 일정 주기를 가지고 서버의 API를 호출하는 방법

  • 예시

    • 클라이언트에서 5초마다 한 번씩 알림 목록을 호출한다면,
    • 업데이트 내역이 5초마다 갱신되며 변경 사항을 적용할 수 있다.
  • 장점

    • 기본적인 HTTP 통신을 기반으로 하기 때문에, 호환성이 좋다.
  • 단점

    • 업데이트 주기가 길다면, 실시간으로 데이터가 갱신 X
    • 업데이트 주기가 짧다면, 갱신 사항이 없음에도 서버에 요청이 들어와 불필요한 서버 부하가 발생

② Long-Polling

  • 업데이트 발생시에만 응답을 보내는 방식

    • 서버로 요청이 들어올 경우, 일정 시간동안 대기하였다가 요청한 데이터가 업데이트 되면 웹 브라우저에게 응답을 보낸다.
    • Polling에서 불필요한 응답을 주는 경우를 줄일 수 있다.
  • 장점

    • 연결이 된 경우, 실시간으로 데이터가 들어올 수 있음
  • 단점

    • 데이터 업데이트가 빈번하게 일어난다면, 연결을 위한 요청도 똑같이 발생하므로 Polling과 유사하게 서버에 부하가 일어날 수 있음

2) Websocket

  • 서버와 웹브라우저 사이 양방향 통신이 가능한 방법

  • 예시

    • 변경 사항에 빠르게 반응해야하는 채팅
    • 리소스 상태에 대한 지속적 업데이트가 필요한 문서 동시 편집
  • 장점

    • 지속적인 양방향 통신 가능
  • 단점

    • 연결을 유지에는 비용이 들어가는데, 트래픽 양이 많아진다면 서버에 큰 부담

3) SSE (Server-Sent Event)

  • 사용법

    • 웹 브라우저에서 서버쪽으로 특정 이벤트를 구독하면,
    • 서버에서는 해당 이벤트 발생시, 웹브라우저 쪽으로 이벤트를 보내주는 방식
  • 장점

    • 한 번만 연결 요청을 보내면, 연결이 종료될 때까지 재연결 과정 없이 서버에서 웹 브라우저로 데이터를 계속해서 보낼 수 있음
  • 단점

    • 단방향 통신(서버에서 웹 브라우저로만 데이터 전송이 가능)
    • 최대 동시 접속 횟수 제한

결론

  • Polling : 실시간 통신을 위해서는 업데이트 주기를 짧게 해야 하지만, 빈번한 HTTP 요청에 의해 트래픽이 많아질 경우 서버에 부하 大
  • Long-Polling : (Polling 와 동일하게) 빈번한 HTTP 요청에 의해 트래픽이 많아질 경우 서버에 부하 大
  • WebSocket : 서버와 클라이언트의 양방향 통신
  • SSE : 서버에서 클라이언트로의 단방향 통신

알림 기능의 경우, 채팅이 아니므로 양방향 통신일 필요가 없다. 그래서 SSE 가 가장 적절하다.

2. 구현

1) 알림 메시지만 전송할 경우

NotificationController

@RestController
@RequiredArgsConstructor
public class NotificationController {
    private final NotificationService notificationService;
    public static Map<Long, SseEmitter> sseEmitters = new ConcurrentHashMap<>();		// 1. 모든 Emitters를 저장하는 ConcurrentHashMap

    // 메시지 알림
    @GetMapping("/api/notification/subscribe")
    public SseEmitter subscribe(@AuthenticationPrincipal UserDetailsImpl userDetails) {
        Long userId = userDetails.getUser().getId();
        SseEmitter sseEmitter = notificationService.subscribe(userId);

        return sseEmitter;
    }
}

1.

  • SSE 를 통한 알림 기능을 받을 사용자들을 등록(저장)할 장소
  • 프론트 쪽에서 사용자가 '로그인'을 하면, 해당 사용자를 sseEmitters 에 등록되도록 하면 된다.

NotificationService

@Service
@RequiredArgsConstructor
public class NotificationService {
    private final PostRepository postRepository;
    private final UserRepository userRepository;

    // 메시지 알림
    public SseEmitter subscribe(Long userId) {
    
        // 1. 현재 클라이언트를 위한 sseEmitter 객체 생성
        SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
        
        // 2. 연결
        try {
            sseEmitter.send(SseEmitter.event().name("connect"));
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 3. 저장
        NotificationController.sseEmitters.put(userId, sseEmitter);

		// 4. 연결 종료 처리
        sseEmitter.onCompletion(() -> NotificationController.sseEmitters.remove(userId));	// sseEmitter 연결이 완료될 경우
        sseEmitter.onTimeout(() -> NotificationController.sseEmitters.remove(userId));		// sseEmitter 연결에 타임아웃이 발생할 경우
        sseEmitter.onError((e) -> NotificationController.sseEmitters.remove(userId));		// sseEmitter 연결에 오류가 발생할 경우

        return sseEmitter;
    }

    // 채팅 수신 알림 - receiver 에게
    public void notifyMessage(String receiver) {
    	// 5. 수신자 정보 조회
        User user = userRepository.findByNickname(receiver);

		// 6. 수신자 정보로부터 id 값 추출
        Long userId = user.getId();

		// 7. Map 에서 userId 로 사용자 검색
        if (NotificationController.sseEmitters.containsKey(userId)) {		
            SseEmitter sseEmitterReceiver = NotificationController.sseEmitters.get(userId);
            // 8. 알림 메시지 전송 및 해체
            try {
                sseEmitterReceiver.send(SseEmitter.event().name("addMessage").data("메시지가 왔습니다."));
            } catch (Exception e) {
                NotificationController.sseEmitters.remove(userId);
            }
        }
    }
    
    // 댓글 알림 - 게시글 작성자 에게
    public void notifyComment(Long postId) {
        Post post = postRepository.findById(postId).orElseThrow(
                () -> new IllegalArgumentException("게시글을 찾을 수 없습니다.")
        );

        Long userId = post.getUser().getId();

        if (NotificationController.sseEmitters.containsKey(userId)) {
            SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
            try {
                sseEmitter.send(SseEmitter.event().name("addComment").data("댓글이 달렸습니다."));
            } catch (Exception e) {
                NotificationController.sseEmitters.remove(userId);
            }
        }
    }
}

1.

  • Long.MAX_VALUE
    • java.lang 패키지의 상수
    • 연결이 최대한 오랫동안 유지되도록 타임아웃을 설정하는 값

2.

  • sseEmitter 객체를 통해 "connect"라는 이름의 이벤트를 클라이언트로 전송
  • 클라이언트와 연결되었음을 나타내는 이벤트

3.

  • userId 를 key 값으로 해서 sseEmitter 객체를 Map 에 저장

4.

  • 각각의 경우에 해당 연결 정보를 Map 에서 제거하는 역할
  • 이를 통해, 더이상 필요하지 않은 연결 정보를 정리해서 메모리 누수를 방지

6., 7.

  • userId 를 추출한 것은 3.에서 userId 를 key 값으로 Map 에 등록해뒀기 때문

8.
(SSE 를 2.에서처럼 연결해두고, 메시지가 전송되거나 게시글에 댓글이 달리면 다음처럼 연결해둔 SSE로 알림이 온다)

  • name : 이벤트를 보낼 이름을 설정
  • data : 보낼 알림 메시지 설정
  • 예외가 발생하면, 해당 연결 정보를 맵에서 제거하여 정리

CommentController

@RestController
@RequestMapping("/api/post/{category}/{postId}")
@RequiredArgsConstructor
public class CommentController {

    private final CommentService commentService;
    private final NotificationService notificationService;

    @PostMapping
    public ResponseEntity<CommentResponseDto> createComment(@PathVariable Long category, @PathVariable Long postId, @RequestBody CommentRequestDto requestDto, @AuthenticationPrincipal UserDetailsImpl userDetails) {
        CommentResponseDto response = commentService.createComment(category, postId, requestDto, userDetails.getUser());
        
        notificationService.notifyComment(postId);		// 댓글 알림 - 게시글 작성자 에게
        
        return new ResponseEntity<>(response, HttpStatus.OK);
    }
    
    ...
    
}    

댓글 작성 시 알림이 오도록 하고 싶다면, 다음처럼 notificationService 에서 구현 후 이를 호출하는 코드 한 줄만 추가해줘도 가능하다.

2) 알림 메시지 + α 전송할 경우 (+ 알림 삭제)

알림 메시지만 전송할 경우, 간단하게 SSE 를 통한 알림 기능을 구현해볼 수 있다.
그러나 알림에는 알림 내용, 누구에게서 온 알림인지, 알림이 온 시간 등... 이 함께 온다면 사용자에게 더욱 편리해질 것이라 생각했다.

NotificationController

@RestController
@RequiredArgsConstructor
public class NotificationController {
    private final NotificationService notificationService;
    public static Map<Long, SseEmitter> sseEmitters = new ConcurrentHashMap<>();

    // 메시지 알림
    @GetMapping("/api/notification/subscribe")
    public SseEmitter subscribe(@AuthenticationPrincipal UserDetailsImpl userDetails) {
        Long userId = userDetails.getUser().getId();
        SseEmitter sseEmitter = notificationService.subscribe(userId);

        return sseEmitter;
    }

    // 알림 삭제
    @DeleteMapping("/api/notification/delete/{id}")
    public MsgResponseDto deleteNotification(@PathVariable Long id) throws IOException {
        return notificationService.deleteNotification(id);
    }
}
  • 메시지 알림(SSE 연결, 구독) 와 알림 삭제를 제외하곤 다른 API 는 추가하지 않았다.
    • 알림 삭제 기능이 필요했기 때문에, SSE 를 통한 실시간 알림 전송 후 이를 DB 에 저장해줬다.
      • 알림 저장을 위해 Notification 엔티티와 레포지토리가 필요했다.

Notification

@Entity
@Setter
@Getter
@Table(name = "notification")
@NoArgsConstructor
public class Notification {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String sender;

    private LocalDateTime createdAt;

    private String contents;        // 채팅 메시지 내용 또는 댓글 내용

    private String roomId;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "post_id")
    private Post post;
}

NotificationRepository

public interface NotificationRepository extends JpaRepository<Notification, Long> {
    Optional<Notification> findById(Long id);
}

NotificationService

@Service
@RequiredArgsConstructor
public class NotificationService {
    private final PostRepository postRepository;
    private final UserRepository userRepository;
    private final NotificationRepository notificationRepository;
    private final CommentRepository commentRepository;
    private final MessageRepository messageRepository;
    private final MessageRoomRepository messageRoomRepository;
    private static Map<Long, Integer> notificationCounts = new HashMap<>();     // 알림 개수 저장

    // 메시지 알림
    public SseEmitter subscribe(Long userId) {
        // 현재 클라이언트를 위한 sseEmitter 생성
        SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
        try {
            // 연결
            sseEmitter.send(SseEmitter.event().name("connect"));
        } catch (IOException e) {
            e.printStackTrace();
        }

        // user 의 pk 값을 key 값으로 해서 sseEmitter 를 저장
        NotificationController.sseEmitters.put(userId, sseEmitter);

        sseEmitter.onCompletion(() -> NotificationController.sseEmitters.remove(userId));
        sseEmitter.onTimeout(() -> NotificationController.sseEmitters.remove(userId));
        sseEmitter.onError((e) -> NotificationController.sseEmitters.remove(userId));

        return sseEmitter;
    }

    // 메시지 알림 - receiver 에게
    public void notifyMessage(String roomId, String receiver, String sender) {
        MessageRoom messageRoom = messageRoomRepository.findByRoomId(roomId);

        Post post = postRepository.findById(messageRoom.getPost().getId()).orElseThrow(
                () -> new IllegalArgumentException("게시글을 찾을 수 없습니다.")
        );

        User user = userRepository.findByNickname(receiver);

        User userSender = userRepository.findByNickname(sender);

        Message receiveMessage = messageRepository.findFirstBySenderOrderByCreatedAtDesc(userSender.getNickname()).orElseThrow(
                () -> new IllegalArgumentException("메시지를 찾을 수 없습니다.")
        );

        Long userId = user.getId();

        if (NotificationController.sseEmitters.containsKey(userId)) {
            SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
            try {
                Map<String, String> eventData = new HashMap<>();
                eventData.put("message", "메시지가 왔습니다.");
                eventData.put("sender", receiveMessage.getSender());                    // 메시지 보낸자
                eventData.put("createdAt", receiveMessage.getCreatedAt().toString());   // 메시지를 보낸 시간
                eventData.put("contents", receiveMessage.getMessage());                 // 메시지 내용

                sseEmitter.send(SseEmitter.event().name("addMessage").data(eventData));

                // DB 저장
                Notification notification = new Notification();
                notification.setSender(receiveMessage.getSender());
                notification.setCreatedAt(receiveMessage.getCreatedAt());
                notification.setContents(receiveMessage.getMessage());
                notification.setRoomId(messageRoom.getRoomId());
                notification.setPost(post);         // post 필드 설정
                notificationRepository.save(notification);

                // 알림 개수 증가
                notificationCounts.put(userId, notificationCounts.getOrDefault(userId, 0) + 1);

                // 현재 알림 개수 전송
                sseEmitter.send(SseEmitter.event().name("notificationCount").data(notificationCounts.get(userId)));

            } catch (Exception e) {
                NotificationController.sseEmitters.remove(userId);
            }
        }
    }
    
    // 댓글 알림 - 게시글 작성자 에게
    public void notifyComment(Long postId) {
        Post post = postRepository.findById(postId).orElseThrow(
                () -> new IllegalArgumentException("게시글을 찾을 수 없습니다.")
        );

        Comment receiveComment = commentRepository.findFirstByPostIdOrderByCreatedAtDesc(post.getId()).orElseThrow(
                () -> new IllegalArgumentException("댓글을 찾을 수 없습니다.")
        );

        Long userId = post.getUser().getId();

        if (NotificationController.sseEmitters.containsKey(userId)) {
            SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
            try {
                Map<String, String> eventData = new HashMap<>();
                eventData.put("message", "댓글이 달렸습니다.");
                eventData.put("sender", receiveComment.getUser().getNickname());        // 댓글 작성자
                eventData.put("createdAt", receiveComment.getCreatedAt().toString());   // 댓글이 달린 시간
                eventData.put("contents", receiveComment.getComment());                 // 댓글 내용

                sseEmitter.send(SseEmitter.event().name("addComment").data(eventData));

                // DB 저장
                Notification notification = new Notification();
                notification.setSender(receiveComment.getUser().getNickname());
                notification.setCreatedAt(receiveComment.getCreatedAt());
                notification.setContents(receiveComment.getComment());
                notification.setPost(post);         // post 필드 설정
                notificationRepository.save(notification);

                // 알림 개수 증가
                notificationCounts.put(userId, notificationCounts.getOrDefault(userId, 0) + 1);

                // 현재 알림 개수 전송
                sseEmitter.send(SseEmitter.event().name("notificationCount").data(notificationCounts.get(userId)));

            } catch (IOException e) {
                NotificationController.sseEmitters.remove(userId);
            }
        }
    }

    // 알림 삭제
    public MsgResponseDto deleteNotification(Long id) throws IOException {
        Notification notification = notificationRepository.findById(id).orElseThrow(
                () -> new IllegalArgumentException("알림을 찾을 수 없습니다.")
        );

        Long userId = notification.getPost().getUser().getId();

        notificationRepository.delete(notification);

        // 알림 개수 감소
        if (notificationCounts.containsKey(userId)) {
            int currentCount = notificationCounts.get(userId);
            if (currentCount > 0) {
                notificationCounts.put(userId, currentCount - 1);
            }
        }
        
        // 현재 알림 개수 전송
        SseEmitter sseEmitter = NotificationController.sseEmitters.get(userId);
        sseEmitter.send(SseEmitter.event().name("notificationCount").data(notificationCounts.get(userId)));

        return new MsgResponseDto("알림이 삭제되었습니다.", HttpStatus.OK.value());
    }
}

알림 메시지 + α

  • Map 과 put 을 통해 다수의 데이터들을 입력했다.

알림 전송 시간

  • 알림 전송 시간을 받아올 때 첫 채팅 메시지or댓글을 전송한 시간이 고정되어 전송되는 문제가 있었다.

  • 이는 findFirstBySenderOrderByCreatedAtDesc 를 통해 해결할 수 있었다.

    • findFirst: 첫 번째 항목을 반환
    • BySender: Sender 필드 기준
    • OrderByCreatedAtDesc: createdAt 필드를 내림차순으로 정렬

    즉, 특정 Sender의 메시지 중에서 최신 메시지를 가져올 수 있는 것이다.

알림 개수 저장

  • 알림 개수는 Map 을 통해 저장했다.
  • SSE 를 통해 알림 개수 증가현재 알림 개수 전송 을 구현
  • HTTP 통신으로 알림 삭제가 이루어지고, 현재 알림 개수가 SSE 로 즉시 전송되도록 구현
    (프론트에서도 개수 처리를 해줄 수 있다고 했다. 따라서, 결론적으론 현재 불필요한 로직이나 정리를 위해 적어두었다)

참고: [백엔드|스프링부트] 알림 기능은 어떻게 구현하는게 좋을까?
참고: 🔥 TIL - Day 64 SSE를 이용한 실시간 알림
참고: [Photogram] 실시간 알림
참고: [Spring + SSE] Server-Sent Events를 이용한 실시간 알림

profile
개발자로 거듭나기!

0개의 댓글