✔️Polling (client pull)
- 클라이언트가 일정한 주기로 서버에 업데이트 요청을 보내는 방식
- 지속적인 HTTP 요청이 발생하므로 리소스 낭비가 발생
✔️WebSocket (Server Push)
- 실시간 양방향 통신을 위한 스펙
- 서버와 브라우저가 지속적으로 연결된 TCP라인을 통해 실시간으로 데이터를 주고받는다
- 연결을 유지하며 클라이언트-서버 간 양방향 통신이 가능
- 주로 채팅, 게임, 주식 차트 등에 사용
✔️SSE (Server Push)
- 이벤트가 [서버 -> 클라이언트] 방향으로만 흐르는 단방향 통신
- 클라이언트가 주기적으로 HTTP요청을 보낼 필요가 없이 HTTP연결을 통해 서버에서 클라언트로 데이터 전달 가능

text/enet-stream이 표준으로 정해져 있다GET /connect HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
name:value필드로 구성되며 이들은 줄바꿈 문자 하나로 구분된다event: type1
data: An event of type1.
event: type2
data: An event of type2.
package techit.gongsimchae.domain.portion.notifications.repository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Repository
@Slf4j
public class EmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
/**
* SseEmitter 저장하는 메서드
*/
public SseEmitter save(String id, SseEmitter sseEmitter) {
emitters.put(id, sseEmitter);
return sseEmitter;
}
/**
* 데이터를 InMemory로 저장하는 메서드
*/
public void saveEventCache(String id, Object event) {
eventCache.put(id, event);
}
public Map<String, SseEmitter> findAllStartById(String id) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(id))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public Map<String, Object> findAllEventCacheStartWithId(String id) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(id))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public void deleteById(String id) {
emitters.remove(id);
}
public void deleteAllEmitterStartWithId(String id) {
emitters.forEach(
(key, emitter) -> {
if(key.startsWith(id)) {
emitters.remove(key);
}
}
);
}
public void deleteAllEventCacheStartWithId(String id) {
eventCache.forEach(
(key, event) ->{
if(key.startsWith(id)) {
eventCache.remove(key);
}
}
);
}
}
Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
private static final Long DEFAULT_TIMEOUT = 1000 * 60 * 5L;
private static final String INQUIRY_URL = "/mypage/inquiry/list";
private final NotificationRepository notificationRepository;
private final EmitterRepository emitterRepository;
private final UserRepository userRepository;
private final ObjectMapper objectMapper;
public SseEmitter subscribe(PrincipalDetails principalDetails, String lastEventId) {
Long userId = getCurrentUser(principalDetails).getId();
String id = userId + "_" + System.currentTimeMillis();
SseEmitter emitter = emitterRepository.save(id, new SseEmitter(DEFAULT_TIMEOUT));
emitter.onCompletion(() -> emitterRepository.deleteById(id));
emitter.onTimeout(() -> emitterRepository.deleteById(id));
// 오류방지를 위한 더미 이벤트 전송
sendToClient(emitter, id, new NotificationResponse("EventStream Created, [userId=" + id+"]"));
// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 방지
if (!lastEventId.isBlank()) {
Map<String, Object> events = emitterRepository.findAllEventCacheStartWithId(String.valueOf(userId));
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
}
return emitter;
}
/**
* 1:1 문의 답변이 왔을 때 알려주는 메서드
*/
@Transactional
public void alertAboutInquiry(User receiver, String content) {
Notifications notifications = Notifications.builder()
.user(receiver).isRead(0).url(INQUIRY_URL).content(content).notificationType(NotificationType.INQUIRY).build();
notificationRepository.save(notifications);
NotificationResponse notificationResponse = new NotificationResponse(notifications);
Map<String, SseEmitter> sseEmitters = emitterRepository.findAllStartById(String.valueOf(receiver.getId()));
log.debug("sseEmitters: {}", sseEmitters);
sseEmitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, notifications);
sendToClient(emitter, key, notificationResponse);
}
);
}
public NotificationResponse getAllNotifications(PrincipalDetails principalDetails) {
User user = userRepository.findByLoginId(principalDetails.getUsername()).orElseThrow(() -> new CustomWebException("not found user"));
List<NotificationRespDtoWeb> result = notificationRepository.findAllUserId(user.getId()).stream()
.map(NotificationRespDtoWeb::new).collect(Collectors.toList());
long unreadCount = result.stream().filter(n -> !n.getIsRead().equals(0)).count();
return new NotificationResponse(result, unreadCount);
}
/**
* 모든 알림창을 읽음표시로 해주는 메서드
*/
@Transactional
public void readNotification(PrincipalDetails principalDetails) {
User user = userRepository.findByLoginId(principalDetails.getUsername()).orElseThrow(() -> new CustomWebException("not found user"));
notificationRepository.findAllUnreadNotificationsByUser(user.getId()).forEach(Notifications::read);
}
private void sendToClient(SseEmitter emitter, String id, Object data) {
try{
emitter.send(SseEmitter.event()
.id(id)
.name("sse")
.data(objectMapper.writeValueAsString(data)));
log.debug("emitter {}, id = {}, data = {}",emitter,id,data);
} catch (IOException e) {
emitterRepository.deleteById(id);
log.error("SSE 연결 오류! ", e);
}
}
/**
* SecurityContext에서 유저를 찾는 메서드
*/
private User getCurrentUser(PrincipalDetails principalDetails) {
return userRepository.findByLoginId(principalDetails.getUsername()).orElseThrow(() -> new RuntimeException("User not found"));
}
}
const eventSource = new EventSource("http://localhost:8081/subscribe");
eventSource.addEventListener('sse', async function (event) {
console.log(event.data);
const data = JSON.parse(event.data);
console.log(data.content)
console.log(data.body)
console.log(data.title)
console.log(data)
if (data.content.startsWith('EventStream')) {
console.log("EventStream 데이터 무시");
return;
}
// 알림을 표시하는 함수
const showNotification = () => {
const notification = new Notification(data.notificationType, {
body: data.content
});
setTimeout(() => {
notification.close();
}, 10 * 1000); // 10초 후에 알림 닫기
notification.addEventListener('click', () => {
window.open(data.url, '_blank');
});
};
// 알림 권한 확인 및 요청
if (Notification.permission === 'granted') {
showNotification();
} else if (Notification.permission === 'default') {
const permission = await Notification.requestPermission();
if (permission === 'granted') {
showNotification();
}
}
});
// SSE 연결 오류 처리
eventSource.addEventListener('error', function (event) {
console.error('SSE 연결 오류:', event);
});