진행중인 프로젝트에서 사용자가 협업 요청을 받으면 해당 요청에 대한 실시간 알람을 받을 수 있는 기능을 구현해야 했다.
클라이언트 - 서버 간의 전달 방식
푸시 알림 (Push Notification)
모바일 환경에서 주로 사용되는 방식으로, 모바일 앱 중심의 FCM(구글 Firebase Cloud Messaging) 과 같은 기술을 사용한다.
내가 진행하는 프로젝트는 웹기반 프로젝트이기 때문에 해당 방식은 다음에 적용해보기로 하고 다른 방식을 찾아보았다.
웹소켓(WebSocket)
양방향 통신이 필요한 애플리케이션에서 사용되며, 클라이언트와 서버 간 지속적인 연결을 유지한다.
클라이언트와 지속적인 연결이 필요한 채팅이나 스트리밍 서비스에 적절하지만, 내가 도입하려는 실시간 요청 알림과는 적절하지 않은 방식이기에 패쓰!
폴링(Polling) 및 롱 폴링(Long Polling)
주기적으로 서버 요청을 보내는 방식
클라이언트가 일정한 간격으로 서버에 새로운 데이터가 있는지 확인하는 방식이다.
실시간 요청 알림과는 맞지 않아서 해당 방식도 쓰루했다.
SSE(Server-Sent Events)
서버에서 클라이언트로 단방향 데이터 스트리밍을 제공하는 기술로, HTTP 기반으로 동작한다.
Server-Sent Events(이하 SSE)는 HTTP 스트리밍을 통해 서버에서 클라이언트로 단방향의 Push Notification을 전송할 수 있는 HTML5 표준 기술이다.
실시간 알림 기능을 구현하는 데 있어 다양한 기술이 있지만, SSE를 선택한 이유는 다음과 같다.
1️⃣ 단방향 푸시에 최적화
SSE는 서버에서 클라이언트로 단방향 메시지 전송을 제공하므로 실시간 알림에 적합
클라이언트가 별도 요청을 보내지 않아도 서버에서 직접 데이터를 전송할 수 있음.
2️⃣ 간편한 구현
WebSocket보다 상대적으로 간단한 구현이 가능하며, HTTP 프로토콜을 그대로 활용할 수 있음.
브라우저에서 기본 지원되므로 별도의 라이브러리 설치가 필요 없음.
3️⃣ 연결 유지 및 자동 재연결
SSE는 기본적으로 HTTP 연결을 유지하며, 연결이 끊어질 경우 브라우저가 자동으로 재연결을 시도함.
재연결 시 Last-Event-ID를 활용하여 누락된 데이터를 보낼 수 있음.
4️⃣ HTTP/2와의 호환성
HTTP/1.1에서는 하나의 요청-응답 흐름이 하나의 TCP 연결에 종속되지만, HTTP/2에서는 하나의 TCP 연결을 유지하면서도 여러 개의 요청을 동시에 처리하기 때문에 HTTP/2를 사용하면 다중 스트리밍을 통해 SSE의 성능을 더욱 최적화할 수 있음.
서버에서는 EventSource를 통해서 날아오는 요청을 수신할 컨트롤러가 필요하다.
SSE 통신을 위해서는 MIME 타입을 text/event-stream으로 해줘야 한다.
컨트롤러의 subscribe 메소드는 요청 수신 시 서비스의 subscribe 메소드를 호출하도록 한다.
MIME (Multipurpose Internet Mail Extensions) 타입은 인터넷에서 파일이나 데이터가 전송될 때 콘텐츠 유형을 정의하는 표준 형식. 이를 통해 클라이언트(브라우저 등)가 수신한 데이터가 어떤 형식인지를 인식할 수 있다.
웹 개발에서는 HTTP 응답의 Content-Type 헤더에서 MIME 타입이 사용된다.
text/html → HTML 문서
application/json → JSON 데이터
image/png → PNG 이미지
@RestController
@RequestMapping("/api/notify")
@RequiredArgsConstructor
public class NotifyController {
private final NotificationServiceImpl notifyService;
private final JwtTokenProvider jwtTokenProvider;
@GetMapping(value = "/subscribe", produces = "text/event-stream")
public SseEmitter subscribe(@RequestHeader("Authorization") String authorizationHeader,
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
String token = authorizationHeader.replace("Bearer ", "");
Long memberId = Long.valueOf(jwtTokenProvider.getClaims(token).getSubject());
return notifyService.subscribe(memberId, lastEventId);
}
}
Last-Event-ID 는 SSE 연결이 끊겼을 경우, 클라이언트가 수신한 마지막 이벤트의 ID 값이다. 항상 있는 것은 아니기 때문에 false 로 설정해준다.
@Entity
@Getter
@Builder
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
public class Notification extends BaseEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private Boolean isRead;
private String message;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "member_id")
@OnDelete(action = OnDeleteAction.CASCADE)
private Member receiver;
}
알림 확인 여부와 알림을 받는 사용자의 아이디, 알림 메세지로 엔티티를 구성했다.
// subscribe 로 연결 요청 시 SseEmitter(발신기)를 생성.
@Override
public SseEmitter subscribe(Long memberId, String lastEventId) {
String emitterId = makeTimeIncludeId(memberId); // SseEmitter 를 구분,관리 하기 위한 식별자
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
// SseEmitter 의 완료/시간초과/에러로 인한 전송 불가 시 sseEmitter 삭제
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
// 503 에러를 방지하기 위한 더미 이벤트 전송
String eventId = makeTimeIncludeId(memberId); // 개별 알림 이벤트를 식별하기 위한 고유한 값
sendNotification(emitter, eventId, emitterId, "EventStream Created. [userId=" + memberId + "]");
// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
if (!lastEventId.isEmpty()) { // Last-Event-ID가 존재한다는 것은 받지 못한 데이터가 있다는 것이다.
sendLostData(lastEventId, memberId, emitterId, emitter);
}
return emitter;
}
private String makeTimeIncludeId(Long memebrId) { // 데이터 유실 시점 파악 위함
return memebrId + "_" + System.currentTimeMillis();
}
private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(eventId)
.data(data));
} catch (IOException exception) {
emitterRepository.deleteById(emitterId);
}
}
private void sendLostData(String lastEventId, Long memberId, String emitterId, SseEmitter emitter) {
Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));
eventCaches.entrySet().stream() // 클라이언트의 요청 헤더에 lastEventId 값이 있는 경우 lastEventId 보다 더 큰(더 나중에 생성된) emitter를 찾아서 발송
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
}
@Override
public void sendNotification(Long memberId, String message) {
Member receiver = memberRepository.findById(memberId).orElseThrow(() ->
new MemberHandler(ErrorStatus.MEMBER_NOT_FOUND));
Notification notification = Notification.builder()
.receiver(receiver)
.message(message)
.isRead(false)
.build();
String receiverId = String.valueOf(memberId);
String eventId = receiverId + "_" + System.currentTimeMillis();
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByMemberId(receiverId);
emitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, notification);
sendNotification(emitter, eventId, key, notification.getMessage());
}
);
}
subscribe 메서드 : 클라이언트에서 서버로 요청이 왔을 때, SseEmitter 를 식별하기 위해서 사용자의 Id 를 포함한 고유한 아이디를 생성해준다.
각 아이디에 해당하는 SseEmitter를 등록 후 SseEmitter의 유효시간 동안 데이터가 전송되지 않으면 503에러 발생하기 때문에 더미데이터를 보내 이를 방지해주어야 한다.
그런 다음, 클라이언트가 수신 받지 못한 Event 목록이 존재할 경우 전송하여 event 유실을 예방한다.
sendNotification(Long memberId, String message) : 받는 사람의 멤버 아이디와 보낼 메세지 내용을 받아서 알림 객체를 생성한 후 알림을 받을 사용자가 구독 중인 모든 연결에 알림을 전송하고, 알림을 저장한다.
또한 알림 이벤트를 캐시하여 나중에 재전송할 수 있도록 저장한다.
private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) :
해당 메서드를 통해 SSE 이벤트 객체를 생성해서 클라이언트에게 실시간 알림을 전송한다.
@Repository
public class EmitterRepositoryImpl implements EmitterRepository{
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
@Override
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}
@Override
public void saveEventCache(String emitterId, Object event) {
eventCache.put(emitterId, event);
}
@Override
public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId){
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public void deleteById(String id) {
emitters.remove(id);
}
@Override
public void deleteAllEmitterStartWithId(String memberId) {
emitters.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
emitters.remove(key);
}
}
);
}
@Override
public void deleteAllEventCacheStartWithId(String memberId) {
eventCache.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
eventCache.remove(key);
}
}
);
}
}
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
emitters: 현재 활성화된 모든 SSE 연결(SseEmitter)을 관리하는 맵.
key = emitterId (사용자ID_시간)
value = SseEmitter (클라이언트와의 연결 객체)
eventCache: 보내지 못한 이벤트를 저장하는 캐시.
key = emitterId (사용자ID_시간)
value = Object (전송할 알림 데이터)
🛠 ConcurrentHashMap을 사용한 이유?
여러 스레드에서 동시 접근할 수 있도록 하기 위해 쓰레드 안전성(Thread-Safety)을 보장함.
SSE는 다중 연결을 지원하므로, 동시 요청이 많을 때도 안정적으로 작동하도록 함.
public SseEmitter save(String emitterId, SseEmitter sseEmitter);
SseEmitter 객체를 emitters 맵에 저장.
public void saveEventCache(String emitterId, Object event);
특정 사용자의 SseEmitter의 연결이 끊어졌을 경우, 해당 이벤트를 캐시에 저장
public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId);
특정 사용자의 ID로 시작하는 emitterId를 가진 모든 SseEmitter를 찾음.
반환된 Map<String, SseEmitter>을 사용하여 해당 사용자의 모든 연결로 알림을 보낼 수 있음.
public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId);
특정 사용자 ID로 시작하는 emitterId를 가진 이전 알림 이벤트 목록을 조회.
클라이언트가 네트워크 문제 등으로 알림을 받지 못했다면, 다시 연결될 때 해당 이벤트를 보낼 수 있음.
public void deleteById(String id);
특정 SseEmitter를 삭제하여 불필요한 연결을 정리.
클라이언트가 연결을 끊었을 때 실행됨.
public void deleteAllEmitterStartWithId(String memberId)
특정 사용자의 모든 SseEmitter를 삭제하여 불필요한 메모리 사용을 방지.
사용자가 로그아웃하거나 브라우저를 닫았을 때 실행될 수 있음.
public void deleteAllEventCacheStartWithId(String memberId)
특정 사용자와 관련된 모든 이벤트 캐시를 삭제.
클라이언트가 다시 연결될 필요가 없을 경우, 캐시된 알림을 정리하여 메모리 낭비를 방지.
@Component
@RequiredArgsConstructor
public class CollabAskListener {
private final NotificationService notificationService;
@TransactionalEventListener
@Async
public void handleCollabAskEvent(CollabAskEvent event) {
// 알림 전송 로직
String message = event.getClubName() + " 으로부터 '" + event.getCollabPostTitle() + "' 에 대하여 협업요청을 받았습니다.";
notificationService.sendNotification(event.getReceiverId(), message);
}
}
트랜잭션이 완료되었을 때만 이벤트를 처리
@TransactionalEventListener 사용 → 트랜잭션이 정상적으로 완료된 후 이벤트를 실행.
여러 개의 알림을 병렬로 처리
@Async를 사용하면 승인 요청이 끝난 후에도 다른 작업이 블로킹되지 않고 병렬로 처리됨.
이벤트 캐시를 ConcurrentHashMap 에 저장하게 되면 서버가 재시작되었을 때, 데이터가 사라지고, 분산 환경에서 동기화가 어렵다는 단점이 있다. 따라서 redis에 저장하는 방식으로 리팩토링 할 예정이다.