Client 하나당 sseemitter하나
SSE의 기본적인 흐름은 클라이언트가 SSE요청을 보내면 서버에서는 클라이언트와 매핑되는 SSE 통신객체를 만든다(SseEmitter) 해당객체가 이벤트 발생시 eventsource를 client에게 전송하면서 데이터가 전달되는 방식이다. sseemitter는 SSE 통신을 지원하는 스프링에서 지원하는 API이다.
맨처음 클라이언트에서 SSE 요청이 오면 서버는 위 그림과 같이 기본적인 응답해더값과 더불어 필요한 헤더들을 반환해야한다. (초록 화살표)
@GetMapping(value = "/connect", produces = "text/event-stream")
@Operation(summary = "SSE 연결")
@ResponseStatus(HttpStatus.OK)
public ResponseEntity<SseEmitter> subscribe(@AuthenticationPrincipal UserDetailsImpl userDetails,
// @RequestHeader를 이용하여 header를 받아 데이터를 꺼내서 사용
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
return ResponseEntity.ok(notificationService.subscribe(userDetails.getUser(), lastEventId));
lastEventId를 파라미터로 받는 이유는 로그인 정보를 기반으로 미수신 event 유실을 예방하기 위해서이다.
→ 뒤에서 자세히 설명!
클라이언트로부터 SSE 연결 요청을 받아서, user정보와 HttpServletResponse값을 토대로 subscribe 메서드를 작성한다.
클라이언트의 sse연결 요청에 응답하기 위해서는 SseEmitter
객체를 만들어 반환해줘야한다. SseEmitter
객체를 만들 때 유효 시간을 줄 수 있다. 이때 주는 시간 만큼 sse 연결이 유지되고, 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보내게 된다. ( 위에서 설명했던 : 이를 방지하기 위한 것이 Last-Event-ID
헤더이다. 이 헤더는 클라이언트가 마지막으로 수신한 데이터의 id값을 의미한다. 이를 이용하여 유실된 데이터를 다시 보내줄 수 있다. )
id를 key로, SseEmitter
를 value로 저장해둔다. 그리고 SseEmitter
의 시간 초과 및 네트워크 오류를 포함한 모든 이유로 비동기 요청이 정상 동작할 수 없다면 저장해둔 SseEmitter
를 삭제한다.
// subscribe 로 연결 요청 시 SseEmitter(발신기)를 생성합니다.
public SseEmitter subscribe(User user, String lastEventId) {
String emitterId = makeTimeIncludeId(user);
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
// SseEmitter 의 완료/시간초과/에러로 인한 전송 불가 시 sseEmitter 삭제
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
// 클라이언트는 SSE Timeout 될 경우 자동으로 재연결 시도
// 재연결 시 한 번도 데이터를 전송한 적이 없다면 503 에러가 발생하므로 최초 연결 시 더미 이벤트를 전송
String eventId = makeTimeIncludeId(user);
sendToClient(emitter, emitterId, eventId,
"연결되었습니다. EventStream Created. [userId=" + user.getId() + "]");
// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
// 클라이언트의 요청 헤더에 lastEventId 값이 있는 경우 lastEventId 보다 더 큰(더 나중에 생성된) emitter를 찾아서 발송
if (!lastEventId.isEmpty()) { // Last-Event-ID가 존재한다는 것은 받지 못한 데이터가 있다는 것이다. (프론트에서 알아서 보내준다.)
Map<String, Object> events = emitterRepository.findAllEventCacheStartWithUserId(
String.valueOf(user.getId()));
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getKey(),
entry.getValue()));
}
return emitter;
}
각각 메소드를 살펴보자면...
makeTimeIncludeId()
private String makeTimeIncludeId(User user) { // 데이터 유실 시점 파악 위함
return user.getId() + "_" + System.currentTimeMillis();
}
데이터의 id값을 ${userId}_${System.currentTimeMillis()}
형태로 두면 데이터가 유실된 시점을 파악할 수 있으므로 저장된 key값 비교를 통해 유실된 데이터만 재전송 할 수 있게 된다.
Last-Event-Id = 3
{3, data1}
{3, data3}
{3, data2}
=> 어떤 데이터까지 제대로 전송되었는지 알 수 없다.
Last-Event-Id = 3_1631593143664
{3_1631593143664, data1}
{3_1831593143664, data3}
{3_1731593143664, data2}
=> data1 까지 제대로 전송되었고, data2, data3을 다시 보내야한다.
다음과 같이 3이라는 ID를 가진 회원의 이벤트 중 뒤의 시간을 기준으로 구분할 수 있게 된다.
sendToClient()
// 특정 SseEmitter 를 이용해 알림을 보냅니다. SseEmitter 는 최초 연결 시 생성되며,
// 해당 SseEmitter 를 생성한 클라이언트로 알림을 발송하게 됩니다.
public void sendToClient(SseEmitter emitter, String emitterId, String eventId, Object data) {
try {
emitter.send(SseEmitter.event()
.name("sse")
.id(eventId)
.data(data));
} catch (IOException exception) {
emitterRepository.deleteById(emitterId);
throw new BusinessException(ErrorCode.SSE_CONNECTION_ERROR);
}
}
send()
@Override
public void send(NotificationRequestDto requestDto) {
sendNotification(requestDto, saveNotification(requestDto));
}
// 알람 저장
private Notification saveNotification(NotificationRequestDto requestDto) {
Notification notification = Notification.builder()
.receiver(requestDto.getReceiver())
.notificationType(requestDto.getNotificationType())
.content(requestDto.getContent())
.url(requestDto.getUrl())
.isRead(false)
.build();
notificationRepository.save(notification);
return notification;
}
notification 테이블에 저장됨
// 알림 보내기
@Async
public void sendNotification(NotificationRequestDto request, Notification notification) {
String receiverId = String.valueOf(request.getReceiver().getId());
String eventId = receiverId + "_" + System.currentTimeMillis();
// 유저의 모든 SseEmitter 가져옴
Map<String, SseEmitter> emitters = emitterRepository
.findAllEmitterStartWithByUserId(receiverId);
emitters.forEach(
(key, emitter) -> {
// 데이터 캐시 저장 (유실된 데이터 처리 위함)
emitterRepository.saveEventCache(key, notification);
// 데이터 전송
sendToClient(emitter, key, eventId, NotificationResponseDto.of(notification));
}
);
}
save - Emitter를 저장한다.
saveEventCache - 이벤트를 저장한다.
findAllEmitterStartWithByMemberId - 해당 회원과 관련된 모든 Emitter를 찾는다.
◦ 브라우저당 여러 개 연결이 가능하기에 여러 Emitter가 존재할 수 있다.
findAllEventCacheStartWithByMemberId - 해당 회원과 관련된 모든 이벤트를 찾는다.
deleteById - Emitter를 지운다.
deleteAllEmitterStartWithId - 해당 회원과 관련된 모든 Emitter를 지운다.
deleteAllEventCacheStartWithId - 해당 회원과 관련된 모든 이벤트를 지운다.
PostServiceImpl - createPostLike()
private final NotificationService notificationService;
notificationService.notifyToUsersThatTheyHaveReceivedLike(postLike); // 게시글 좋아요 알람 추가
PostCommentServiceImpl - createPostComment()
private final NotificationService notificationService;
notificationService.notifyToUsersThatTheyHaveReceivedComment(postComment); // 게시글 댓글 알람 추가
게시글 작성자 아이디로 SSE 연결
작성자가 쓴 게시글에 좋아요 보내기
DB저장 + 알람 확인
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer { // 비동기 설정
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(3);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(50);
taskExecutor.setThreadNamePrefix("async-thread-");
taskExecutor.initialize();
return taskExecutor;
}
}