진행 중인 프로젝트에 실시간 알림 기능 구현이 필요해졌다.
실시간 알림 기능은 클라이언트가 서버에게 요청을 보내는 기존 기능과는 달리, 서버가 클라이언트에게 데이터를 보내면 클라이언트가 이를 인지하고 알림을 띄워줘야하는 구조를 가지고 있습니다.
[Client] SSE Subscribe 요청
클라이언트측에서 우선 서버의 이벤트 구독을 위한 요청을 보내야 합니다.
이벤트의 미디어 타입은 text/event-stream이 표준으로 정해져있습니다.
[Server] Subscription에 대한 응답
Response의 미디어 타입은 text/event-stream 입니다. 이때 Transfer-Encoding 헤더의 값을 chunked로 설정하는데, 왜냐하면 서버는 동적으로 생성된 컨텐츠를 스트리밍 하기 때문에 본문의 크기를 미리 알 수 없기 때문입니다.
[Server] 이벤트 전달
클라이언트에서 subscribe를 하면, 서버는 해당 클라이언트에게 비동기적으로 데이터를 전송할 수 있습니다.(데이터는 utf-8로 인코딩된 텍스트 데이터만 가능합니다.)
서로 다른 이벤트는 \n\n 로 구분되며, 각각의 이벤트는 한 개 이상의 name:value로 구성됩니다. (이벤트 안의 내용은 \n으로 구분됩니다.)
SseEmitter는 Spring Framework에서 제공하는 SSE를 구현한 클래스이다.
클라이언트가 SseEmitter 객체를 생성하고 이를 서버에 등록하면, 서버는 이벤트 스트림을 생성하고 SseEmitter 객체를 통해 지속적으로 클라이언트에게 이벤트를 전송하기위해 사용한다.
timeout은 10분으로 설정해놨는데 추후에 알아보고 어떻게 설정하는것이 좋은지 적용시켜볼 생각이다.
[23.04.05 Nginx사용시 추가적인 설정이 필요]
public SseEmitter subscribe(Long memberId, String lastEventId) {
// emitter 에 고유값을 주기위해
String emitterId = makeTimeIncludeUd(memberId);
Long timeout = 10L * 1000L * 60L; // 10분
// 생성된 emitterId를 기반으로 emitter 를 저장
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(timeout));
// 전달이 완료되거나 emitter의 시간이 만료된 후 레포에서 삭제
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
// 에러를 방지하기 위해 더미 데이터를 전달
String eventId = makeTimeIncludeUd(memberId);
// 이벤트를 구분하기 위해 이벤트 ID 를 시간을 통해 구분해줌
sendNotification(emitter, eventId, emitterId, "EventStream Created. [userId=" + memberId + "]");
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, memberId, emitterId, emitter);
}
return emitter;
}
아래는 sendNotification매서드를 이용해 클라이언트에게 보내주는 부분이다
private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(eventId)
.name("message")
.data(data));
} catch (IOException exception) {
emitterRepository.deleteById(emitterId);
}
}
연결할때는 data값을 String 인 "EventStream Created. [userId=" + memberId + "]" 을 사용했다.
이후 메세지를 보낼때는 data값에 dto를 이용해 보내준다.
프로젝트상 SSE가 연결이 되고나면 리뷰를 작성시에 작성자에게 알림을 보내주도록 구현해놨다.
리뷰를 작성하는 메서드에
notificationService.send(Member receiver, String content, String url) 을 추가해 줬다.
NotificationService 에 있는 send 메서드이다.
다음은
비동기식으로 처리하기위해 @Async 어노테이션을 사용했다.
조회하고 삭제하는 메서드도 구현했지만 기존의 CRUD와 동일해서 skip...
@Async
public void send(Member receiver, String content, String url){
Notification notification = notificationRepository.save(createNotification(receiver, content, url));
Long receiverId = receiver.getId();
String eventId = makeTimeIncludeUd(receiverId);
Map<String, SseEmitter> emitterMap = emitterRepository.findAllEmitterStartWithByMemberId(String.valueOf(receiverId));
emitterMap.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, emitter);
sendNotification(emitter, eventId, key, NotificationDto.create(notification));
}
);
}
아래는 유실된 데이터가있는지 확인하고 있다면 알림을 전송해주는 메서드 이다.
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, memberId, emitterId, emitter);
}
// lastEventId 이후에 발생한 알림을 전송
private void sendLostData(String lastEventId, Long memberId, String emitterId, SseEmitter emitter) {
Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));
eventCaches.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
}
// lastEventId가 비어있는지 확인
private boolean hasLostData(String lastEventId) {
return !lastEventId.isEmpty();
}
사실 SSE같은 기술을 처음 쓰면 메서드 보단 환경설정 이나 개념에 대한 이해가 더 어려웠다.
어떻게 프론트로 넘겨주는거지? 라는 생각이 SSE를 이해하는데 어려웠던 부분이 었는데
간단했다. SseEmitter 클래스가 이를 해결해주고 있었다.
SSE는 Socket과 비교가 많이되는데 왜 알림에서는 SSE를 많이쓰는지 알아볼 필요가있다.