
나는 알림기능 맡아서 구현하기로 했다.
일단 클라이언트와 서버 통신 방법부터 공부해야했다.
클라이언트 - 서버 통신 방법에는
폴링(Polling), 긴 폴링(Long Polling), 스트리밍(Streaming), 웹 소켓(WebSocket), SSE(Server-Sent-Events) 등이 있다
그중에 나는 SSE 방식으로 개발하기로 했다.
@Operation(summary = "sse세션연결")
@GetMapping(value = "/api/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> subscribe(@Parameter(hidden = true) @AuthenticationPrincipal UserDetailsImpl userDetails,
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
return ResponseEntity.ok(sseService.subscribe(userDetails.getMember(), lastEventId));
}
EventSource로 요청을 해오면 처리할 Controller이다.Last-Event-ID는 헤더에 담겨져 오는 값으로 이전에 받지 못한 이벤트가 존재하는 경우(SSE연결에 대한 시간 만료 혹은 종료)나 받은 마지막 이벤트 ID 값을 넘겨 그 이후의 데이터(받지 못한 데이터)부터 받을 수 있게 할때 필요한 값이다. public SseEmitter subscribe(Member member, String lastEventId) {
String emitterId = member.getUsername() + "_" + System.currentTimeMillis();
SseEmitter sseEmitter = sseRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
log.info("new emitter added : {}", sseEmitter);
log.info("lastEventId : {}", lastEventId);
/* 상황별 emitter 삭제 처리 */
sseEmitter.onCompletion(() -> sseRepository.deleteEmitterById(emitterId)); //완료 시, 타임아웃 시, 에러 발생 시
sseEmitter.onTimeout(() -> sseRepository.deleteEmitterById(emitterId));
sseEmitter.onError((e) -> sseRepository.deleteEmitterById(emitterId));
/* 503 Service Unavailable 방지용 dummy event 전송 */
send(sseEmitter, emitterId, createDummyNotification(member.getUsername()));
/* client가 미수신한 event 목록이 존재하는 경우 */
if(!lastEventId.isEmpty()) { //client가 미수신한 event가 존재하는 경우 이를 전송하여 유실 예방
Map<String, Object> eventCaches = sseRepository.findAllEventCacheStartsWithUsername(member.getUsername()); //id에 해당하는 eventCache 조회
eventCaches.entrySet().stream() //미수신 상태인 event 목록 전송
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> emitEventToClient(sseEmitter, entry.getKey(), entry.getValue()));
}
return sseEmitter;
}
String emitterId = member.getUsername() + "_" + System.currentTimeMillis();
subscribe()를 보면 id값을 member.getUsername()_System.currentTimeMillis() 형태로 사용하는 것을 볼 수 있다. 이렇게 사용하는 이유가 Last-Event-ID 헤더와 상관이 있다.Last-Event-ID헤더는 클라이언트가 마지막으로 수신한 데이터의 id값을 의미한다. Last-Event-Id값이 의미가 없어진다.member.getUsername()_System.currentTimeMillis() 형태로 두면 데이터가 유실된 시점을 파악할 수 있으므로 저장된 key값 비교를 통해 유실된 데이터만 재전송 할 수 있게 된다.member.getUsername()_System.currentTimeMillis() 로 사용한다.SseEmitter sseEmitter = sseRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
SseEmitter 객체를 만들어 반환해줘야함.SseEmitter 객체를 만들 때 유효 시간을 줄 수 있다. 이때 주는 시간 만큼 sse 연결이 유지되고, 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보내게 된다./* 503 Service Unavailable 방지용 dummy event 전송 */
send(sseEmitter, emitterId, createDummyNotification(member.getUsername()));
/* client가 미수신한 event 목록이 존재하는 경우 */
if(!lastEventId.isEmpty()) { //client가 미수신한 event가 존재하는 경우 이를 전송하여 유실 예방
Map<String, Object> eventCaches = sseRepository.findAllEventCacheStartsWithUsername(member.getUsername()); //id에 해당하는 eventCache 조회
eventCaches.entrySet().stream() //미수신 상태인 event 목록 전송
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> emitEventToClient(sseEmitter, entry.getKey(), entry.getValue()));
}
public interface SseRepository {
SseEmitter save(String emitterId, SseEmitter sseEmitter);
void saveEventCache(String eventCacheId, Object event);
Map<String, SseEmitter> findAllEmitterStartsWithUsername(String username);
Map<String, Object> findAllEventCacheStartsWithUsername(String username);
void deleteEmitterById(String id);
void deleteAllEmitterStartsWithId(String id);
void deleteAllEventCacheStartsWithId(String id);
}
save - Emitter를 저장한다.saveEventCache - 이벤트를 저장한다.findAllEmitterStartWithByUsername - 해당 회원과 관련된 모든 Emitter를 찾는다, 브라우저당 여러 개 연결이 가능하기에 여러 Emitter가 존재할 수 있다.findAllEventCacheStartWithByUsername - 해당 회원과 관련된 모든 이벤트를 찾는다.deleteEmitterById - Emitter를 지운다.deleteAllEmitterStartsWithId - 해당 회원과 관련된 모든 Emitter를 지운다.deleteAllEventCacheStartsWithId - 해당 회원과 관련된 모든 이벤트를 지운다.//...
/**
* [SSE 통신]specific user에게 알림 전송
*/
public void send(String receiver, String content, String type, String url) {
Notification notification = createNotification(receiver, content, type, url);
/* 로그인한 client의 sseEmitter 전체 호출 */
Map<String, SseEmitter> sseEmitters = sseRepository.findAllEmitterStartsWithUsername(receiver);
sseEmitters.forEach(
(key, sseEmitter) -> {
log.info("key, notification : {}, {}", key, notification);
sseRepository.saveEventCache(key, notification); //저장
emitEventToClient(sseEmitter, key, notification); //전송
}
);
}
/**
* [SSE 통신]dummy data 생성
* : 503 Service Unavailable 방지
*/
private Notification createDummyNotification(String receiver) {
return Notification.builder()
.notificationId(receiver + "_" + System.currentTimeMillis())
.receiver(receiver)
.content("send dummy data to client.")
.notificationType(NotificationType.NOTICE.getAlias())
.url(NotificationType.NOTICE.getPath())
.readYn('N')
.deletedYn('N')
.build();
}
/**
* [SSE 통신]notification type별 data 생성
*/
private Notification createNotification(String receiver, String content, String type, String url) {
if(type.equals(NotificationType.COMMENT.getAlias())) { //댓글
return Notification.builder()
.receiver(receiver)
.content(content)
.notificationType(NotificationType.COMMENT.getAlias())
.url(url)
.readYn('N')
.deletedYn('N')
.build();
} else if(type.equals(NotificationType.LIKEQUIZ.getAlias())) { //좋아요
return Notification.builder()
.receiver(receiver)
.content(content)
.notificationType(NotificationType.LIKEQUIZ.getAlias())
.url(url)
.readYn('N')
.deletedYn('N')
.build();
} else if(type.equals(NotificationType.NOTICE.getAlias())) { //공지
return Notification.builder()
.receiver(receiver)
.content(content)
.notificationType(NotificationType.NOTICE.getAlias())
.url(url)
.readYn('N')
.deletedYn('N')
.build();
} else {
return null;
}
}
/**
* [SSE 통신]notification type별 event 전송
*/
private void send(SseEmitter sseEmitter, String emitterId, Object data) {
try {
sseEmitter.send(SseEmitter.event()
.id(emitterId)
.name("sse")
.data(data, MediaType.APPLICATION_JSON));
} catch(IOException exception) {
sseRepository.deleteEmitterById(emitterId);
sseEmitter.completeWithError(exception);
}
}
/**
* [SSE 통신]
*/
private void emitEventToClient(SseEmitter sseEmitter, String emitterId, Object data) {
try {
send(sseEmitter, emitterId, data);
sseRepository.deleteEmitterById(emitterId);
} catch (Exception e) {
sseRepository.deleteEmitterById(emitterId);
throw new RuntimeException("Connection Failed.");
}
}
public void send(String receiver, String content, String type, String url)
private Notification createDummyNotification(String receiver)
private Notification createNotification(String receiver, String content, String type, String url)
private void send(SseEmitter sseEmitter, String emitterId, Object data)
private void emitEventToClient(SseEmitter sseEmitter, String emitterId, Object data)
location {
proxy_pass http://spring-server;
proxy_set_header Connection '';
proxy_set_header Cache-Control 'no-cache';
proxy_set_header X-Accel-Buffering 'no';
proxy_set_header Content_Type 'text/event-stream';
proxy_buffering off;
proxy_http_version 1.1;
chunked_transfer_encoding on;
proxy_read_timeout 86400s;
}
location /api/subscribe {
proxy_pass http://spring-server;
proxy_set_header Connection '';
proxy_set_header Cache-Control 'no-cache';
proxy_set_header X-Accel-Buffering 'no';
proxy_set_header Content_Type 'text/event-stream';
proxy_buffering off;
proxy_http_version 1.1;
chunked_transfer_encoding on;
proxy_read_timeout 86400s;
}
//Service
@Transactional
public void updateNotificationReadStatusByUsername(String username) {
notificationRepository.bulkReadUpdate(username);
}
//Repository
@Modifying
@Query("update Notification set readYn = 'Y' where receiver = :username")
void bulkReadUpdate(@Param("username") String username);
/**
* [SSE 통신]
*/
private void emitEventToClient(SseEmitter sseEmitter, String emitterId, Object data) {
try {
send(sseEmitter, emitterId, data);
} catch (Exception e) {
sseRepository.deleteEmitterById(emitterId);
throw new RuntimeException("Connection Failed.");
}
}