SSE(Server-Sent Events)는 웹 애플리케이션에서 서버로부터 데이터를 비동기적으로 전송받을 수 있는 기술 중 하나이다. 클라이언트의 별도의 요청이 없이도 알림처럼 실시간으로 서버에서 데이터를 전달해야할때가 있다. 이럴때 단방향으로 통신을 지원하며 서버로 데이터를 보낼수없다는 단점이 있지만, 실시간 업데이트가 필요할때는 효율적으로 데이터를 전달할 수 있다.
물론, SSE 방식외에도 클라이언트가 주기적으로 서버로 요청을 보내서 데이터를 받는 Short Polling , 서버의 변경이 일어날때까지 대기하는 Long Polling 방식이 있지만, 해당 프로젝트는 실시간으로 반영되어야하고 빈번하게 발생 될 수있는 상황이기에 SSE를 선택하였다. SSE는 서버와의 한번 연결을 하고나면 HTTP 1.1의 keep alive와 비슷하게 서버에서 변경이 일어날떄마다 데이터를 전송하는 방법이다.
Client 하나당 sseemitter하나
SSE의 기본적인 흐름은 클라이언트가 SSE요청을 보내면 서버에서는 클라이언트와 매핑되는 SSE 통신객체를 만든다(SseEmitter) 해당객체가 이벤트 발생시 eventsource를 client에게 전송하면서 데이터가 전달되는 방식이다. sseemitter는 SSE 통신을 지원하는 스프링에서 지원하는 API이다.
@RestController
@RequiredArgsConstructor
@RequestMapping("/v1/notification")
public class NotificationController {
private final NotificationService notificationService;
/**
* 로그인 한 유저의 Server-Sent Events(SSE) 연결을 관리합니다. SseEmitter를 사용하여 클라이언트에게 비동기적으로 이벤트를 보냅니다.
*
* @param userDetails 현재 인증된 사용자의 세부 정보
* @param lastEventId 마지막으로 수신한 이벤트의 ID (필수는 아님)
* @return SSE 연결을 위한 SseEmitter 객체
*/
@GetMapping(value = "/subscribe", produces = "text/event-stream")
public SseEmitter subscribe(@AuthenticationPrincipal UserDetailsImpl userDetails,
@RequestParam(value = "lastEventId", required = false, defaultValue = "") String lastEventId) {
// lastEventId : 마지막 이벤트 ID, 클라이언트가 마지막으로 수신한 이벤트를 식별하는데 사용 (필수는 아님)
return notificationService.subscribe(userDetails.getUser(), lastEventId);
}
/**
* 로그인한 사용자의 모든 알림을 조회합니다. ResponseEntity를 통해 알림 데이터와 HTTP 상태 코드를 함께 반환합니다.
*
* @param userDetails 현재 인증된 사용자의 세부 정보
* @return 사용자의 모든 알림을 포함하는 ApiResponse 객체
*/
@GetMapping("/notifications")
public ResponseEntity<ApiResponse> notifications(
@AuthenticationPrincipal UserDetailsImpl userDetails) {
NotificationsResponseDto responseDto = notificationService.findAllById(userDetails.getUser());
return ResponseEntity.ok()
.body(new ApiResponse<>("알림 조회에 성공하였습니다.", HttpStatus.OK.value(), responseDto));
}
/**
* 지정된 알림의 읽음 상태를 변경합니다. 성공적으로 변경되면 상태 메시지와 함께 HTTP 상태 코드를 반환합니다.
*
* @param notificationId 변경할 알림의 ID
* @return 알림 상태 변경에 대한 ApiResponse 객체
*/
@PatchMapping("/notifications/{notificationId}")
public ResponseEntity<ApiResponse> readNotification(@PathVariable Long notificationId) {
notificationService.readNotification(notificationId);
return ResponseEntity.ok()
.body(new ApiResponse<>("알림 읽음 상태 변경에 성공하였습니다.", HttpStatus.OK.value()));
}
/**
* 지정된 알림을 삭제합니다. 성공적으로 삭제ㄴ되면 상태 메시지와 함께 HTTP 상태 코드를 반환합니다.
*
* @param notificationId 변경할 알림의 ID
* @return 알림 상태 변경에 대한 ApiResponse 객체
*/
@DeleteMapping("/notifications/{notificationId}")
public ResponseEntity<ApiResponse> deleteNotification(@PathVariable Long notificationId) {
notificationService.deleteNotification(notificationId);
return ResponseEntity.ok()
.body(new ApiResponse<>("알림 읽음 상태 변경에 성공하였습니다.", HttpStatus.OK.value()));
}
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60; // SSE 연결의 기본 타임아웃 값 (60분)
private final EmitterRepository emitterRepository;
private final NotificationRepository notificationRepository;
private final ReviewRepository reviewRepository;
private final ReportRepository reportRepository;
private final RoomRepository roomRepository;
private final ChatRepository chatRepository;
// 사용자가 SSE를 통해 알림을 실시간으로 받을 수 있게 설정하는 메서드
public SseEmitter subscribe(User user, String lastEventId) {
Long userId = user.getId();
// 사용자 ID와 현재 시간을 조합해 고유한 emitter 식별자 생성
String id = userId + "_" + System.currentTimeMillis();
SseEmitter emitter = emitterRepository.save(id, new SseEmitter(DEFAULT_TIMEOUT));
// 연결이 완료되거나 타임아웃 되면 emitter를 레포지토리에서 제거
emitter.onCompletion(() -> emitterRepository.deleteById(id));
emitter.onTimeout(() -> emitterRepository.deleteById(id));
// 503 에러를 방지하기 위한(SSE 연결을 유지하기 위한) 더미 이벤트 전송
sendToClient(emitter, id, "EventStream Created. [userId=" + userId + "]");
// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
if (!lastEventId.isEmpty()) {
Map<String, Object> events = emitterRepository.findAllEventCacheStartWithId(
String.valueOf(userId));
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
}
return emitter;
}
// 사용자에게 새 알림을 생성하고 저장한 후, 해당 사용자의 모든 SSE 연결에 이 알림을 전송
@Transactional
public void send(User user, String type, Long entityId, User actionUser) {
// 새로운 createNotification 메서드를 사용하여 알림 생성
Notification notification = createNotification(user, type, entityId, actionUser);
// 알림 저장 및 전송 로직
saveAndSendNotification(notification, user.getId());
}
private void saveAndSendNotification(Notification notification, Long userId) {
// 생성된 알림 데이터베이스 저장
notificationRepository.save(notification);
// 사용자 ID를 기반으로 모든 SSE 연결 찾음
Map<String, SseEmitter> sseEmitters = emitterRepository.findAllStartWithById(
String.valueOf(userId));
// 각 SSE 연결에 대해, 생성된 알림을 캐시에 저장 후 클라이언트에게 전송
sseEmitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, notification);
sendToClient(emitter, key, new NotificationResponseDto(notification));
}
);
}
// 알림 생성
public Notification createNotification(User user, String entityType, Long entityId,
User actionUser) {
String url = "";
String content = "";
switch (entityType) {
case "REVIEW":
Optional<Review> optionalReview = reviewRepository.findById(entityId);
if (optionalReview.isPresent()) {
Review review = optionalReview.get();
if (actionUser != null && review.getUser() != null) {
url = "/productDetails?productId=" + review.getProduct().getId()
+ "&categoryId=" + review.getProduct().getCategory().getId();
content = actionUser.getNickname() + "님이 " + review.getUser().getNickname()
+ "님의 리뷰에 좋아요를 눌렀습니다.";
}
}
break;
case "REPORT":
Optional<Report> optionalReport = reportRepository.findById(entityId);
if (optionalReport.isPresent()) {
Report report = optionalReport.get();
if (report.getUser() != null) {
url = "/main/report";
String statusString = report.getStatus().name().toString();
switch (statusString) {
case "PROCEEDING":
content = report.getUser().getNickname() + "님이 제보한 상품 " + report.getName()
+ "가(이) 진행중입니다.";
break;
case "ADOPTED":
content = report.getUser().getNickname() + "님이 제보한 상품 " + report.getName()
+ "가(이) 채택되었습니다.";
break;
case "UN_ADOPTED":
content = report.getUser().getNickname() + "님이 제보한 상품 " + report.getName()
+ "가(이) 비채택되었습니다.";
break;
default:
content = "알 수 없는 상태입니다.";
}
}
}
break;
case "ROOM":
// 채팅방 ID를 기반으로 채팅방 정보 조회
Optional<Room> optionalRoom = roomRepository.findById(entityId);
if (optionalRoom.isPresent()) {
Room room = optionalRoom.get();
User partner = room.getPartner(user.getId()); // 상대방 정보
if (partner.getRole().equals(UserRoleEnum.ADMIN)) {
url = "/main/chat?userId=" + partner.getId();
} else {
url = "/admin/chat?userId=" + partner.getId();
}
content = partner.getNickname() + "님에게 문의가 왔습니다.";
}
break;
// 다른 케이스에 대한 처리...
default:
// 유효하지 않은 entityType인 경우
return null;
}
NotificationRequestDto requestDto = NotificationRequestDto.builder()
.user(user)
.content(content)
.url(url)
.entityType(entityType)
.entityId(entityId)
.isRead(false)
.build();
return new Notification(requestDto);
}
// SseEmitter 객체 사용하여 클라이언트에게 이벤트 전송하는 메서드
public void sendToClient(SseEmitter emitter, String id, Object data) {
try {
String jsonData = new ObjectMapper().writeValueAsString(data);
emitter.send(SseEmitter.event().id(id).name("sse").data(jsonData));
} catch (IOException exception) {
emitterRepository.deleteById(id);
log.error("SSE 연결 오류", exception);
}
}
// 모든 알림 조회
@Transactional
public NotificationsResponseDto findAllById(User user) {
List<NotificationResponseDto> responses = notificationRepository.findAllByUserId(user.getId())
.stream()
.map(NotificationResponseDto::new) // 생성자를 사용하여 객체 생성
.collect(Collectors.toList());
long unreadCount = responses.stream()
.filter(notification -> !notification.isRead())
.count();
return new NotificationsResponseDto(responses, unreadCount);
}
// 지정된 ID의 알림을 찾아 '읽음' 상태로 변경
@Transactional
public void readNotification(Long notificationId) {
Notification notification = notificationRepository.findById(notificationId)
.orElseThrow(() -> new ApiException("존재하지 않는 알림입니다.", HttpStatus.NOT_FOUND));
notification.read();
}
// 알림 서비스 내에 알림 삭제 메서드 추가
@Transactional
public void deleteNotification(Long notificationId) {
Notification notification = notificationRepository.findById(notificationId)
.orElseThrow(() -> new ApiException("존재하지 않는 알림입니다.", HttpStatus.NOT_FOUND));
notificationRepository.delete(notification);
}
// 알림 삭제
public void deleteNotificationByEntity(String entityType, Long entityId) {
List<Notification> notificationsToDelete = notificationRepository.findByEntityTypeAndEntityId(
entityType, entityId);
// 해당 항목(예: 리뷰 또는 제보)과 관련된 모든 알림을 삭제
notificationRepository.deleteAll(notificationsToDelete);
}
}
// SSE를 위한 emitter 객체와 관련된 이벤트 데이터를 관리하는 클래스
// 'Map'을 사용하여 인스턴스 이벤트 데이터를 메모리 내 관리하는 특정한 방식의 데이터 관리를 구현
@Repository
public class EmitterRepository {
// ID-KEY / SseEmitter객체-value, SSE 연결을 저장하고 관리
public final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
// 이벤트 데이터를 캐싱하는 데 사용 / 각 이벤트에는 고유한 ID 존재
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
// SseEmitter 객체 저장
public SseEmitter save(String id, SseEmitter sseEmitter) {
emitters.put(id, sseEmitter);
return sseEmitter;
}
// 이벤트 데이터를 캐시에 저장
public void saveEventCache(String id, Object event) {
eventCache.put(id, event);
}
// 주어진 ID로 시작하는 모든 SseEmitter 객체를 찾아 반환
public Map<String, SseEmitter> findAllStartWithById(String id) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(id))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// 주어진 ID로 시작하는 모든 이벤트 데이터를 찾아 반환
public Map<String, Object> findAllEventCacheStartWithId(String id) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(id))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// 주어진 ID로 시작하는 모든 SseEmitter 객체 삭제
public void deleteAllStartWithId(String id) {
emitters.forEach(
(key, emitter) -> {
if (key.startsWith(id)) {
emitters.remove(key);
}
}
);
}
나머지 dto와 repository 등은 구조는 다른 기능들과 비슷하다!!