친구들과 함께하는 개인 프로젝트를 진행하던 중, 발생한 상황이 있다.
그건 바로 요청에 따라 지속적인 응답을 받아야 하는 상황
이었다.
여러 방법을 검토하던 중 SSE
를 적용하기로 했고, 블로그에 적어야겠다! 생각했다.
SSE는 서버의 데이터를 실시간으로 스트리밍하는 기술
이다. 서버의 변경된 데이터를 가져오기 위해서 요청을 보내거나 페이지를 새로고침 하는 등 클라이언트의 동작이 있어야만 했다. 물론 Polling, Websocket 등을 사용하는 방법도 있지만 이는 별도의 서버와 프로토콜로 통신하므로 비용이 많이 발생한다. 그래서 실시간 알림 등 서버에서 주체적으로 응답을 받아와야할 때는 SSE를 사용하면 BackEnd, FrontEnd 모두 쉽게 개발이 가능하고 , HTTP API만으로 구현할 수 있다.
SSE는 결국 서버의 단방향 통신
이다. 따라서 주로 데이터를 받기만 하면 되고, 반드시 실시간이지 않아도되는 알림 등
에서 사용된다.
주로 WebSocket
을 사용하고, 오디오, 영상 등의 데이터를 교환할 때는 WebRTC
를 사용한다. 그럼 WebSocket 과 WebRTC 를 간단하게 알아보자.
WebRTC
signaling 서버
가 필요 -> WebSocket 혹은 Socket.io를 사용해 구현
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* 알림 controller (with SSE)
* @author eunivverse
*/
@RestController
@RequestMapping("/notifications")
@RequiredArgsConstructor
public class NotificationController {
private final NotificationService notificationService;
/**
* SSE 구독
* @param id
* @return
*/
// SSE 이벤트를 구독하는 Controller 는 MediaType 을 `TEXT_EVENT_STREAM_VALUE` 로 해야한다
@GetMapping(value = "/subscribe/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(@PathVariable Long id) throws Exception {
return notificationService.subscribe(id);
}
/**
* SSE 데이터 전송
* @param id
*/
@PostMapping("/send-data/{id}")
public void sendData(@PathVariable Long id) {
notificationService.notify(id, "data");
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
// SSE 이벤트 연결 서버 타임아웃 시간
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final NotificationRepository notificationRepository;
/**
* 클라이언트가 구독을 위해 호출
*
* @param userId - 클라이언트의 사용자 아이디.
* @return SseEmitter - 서버에서 보낸 이벤트 Emitter
*/
public SseEmitter subscribe(Long userId) throws Exception {
try {
// 사용자 아이디를 기반으로 이벤트 Emitter 생성
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
// DB에 Emitter 저장
notificationRepository.save(userId, emitter);
// Emitter가 모든 데이터가 성공적으로 전송되었을 때 Emitter 삭제
emitter.onCompletion(() -> notificationRepository.deleteById(userId));
// Emitter가 지정된 시간동안 어떠한 이벤트도 전송되지 않았을 때 Emitter 삭제
emitter.onTimeout(() -> notificationRepository.deleteById(userId));
notify(userId, "EventStream Created. [userId=" + userId + "]");
return emitter;
} catch (Exception e) {
log.error("SSE subscription failed. exception = {}", e);
throw e;
}
}
/**
* 서버의 이벤트를 클라이언트에게 보내는 메서드
* 다른 서비스 로직에서 이 메서드를 사용해 데이터를 Object event 에 넣고 전송
*
* @param userId - 메세지를 전송할 사용자의 아이디.
* @param event - 전송할 이벤트 객체.
*/
public void notify(Long userId, Object event) {
SseEmitter emitter = notificationRepository.get(userId);
if (emitter != null) {
try {
// 클라이언트에게 데이터를 전송
emitter.send(SseEmitter.event().id(String.valueOf(userId)).name("event").data(event));
} catch (Exception e) {
// SSE 이벤트 전송 실패 시 로그 입력
log.error("SSE notify failed. exception = {}", e);
// DB에서 데이터 삭제
notificationRepository.deleteById(userId);
// emitter 에 error 전송
emitter.completeWithError(e);
}
}
}
}
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Repository
@RequiredArgsConstructor
public class NotificationRepository {
// 모든 Emitters를 저장하는 ConcurrentHashMap
private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
/**
* 주어진 아이디와 이미터를 저장
*
* @param id - 사용자 아이디.
* @param emitter - 이벤트 Emitter.
*/
public void save(Long id, SseEmitter emitter) {
emitters.put(id, emitter);
}
/**
* 주어진 아이디의 Emitter를 제거
*
* @param id - 사용자 아이디.
*/
public void deleteById(Long id) {
emitters.remove(id);
}
/**
* 주어진 아이디의 Emitter를 가져옴.
*
* @param id - 사용자 아이디.
* @return SseEmitter - 이벤트 Emitter.
*/
public SseEmitter get(Long id) {
return emitters.get(id);
}
}
// eventSource 생성
const eventSource = new EventSource('http://localhost:8080/notifications/subscribe/1');
// event 받아오기
eventSource.addEventListener('event', event => {
console.log(event);
// 이후 event 기반 알림 toast 보이도록 로직 구현 ...
});